RxJs中提供了n种operators来支持并发,譬如map, mergeAll, switchMap等等。具体可以查看rxjs的官方文档或者搜索引擎。然而,现实中比较常见的并发请求,是需要只针对出错的需求进行重新计算。

场景一,多文件上传时,为了缩短客户端的响应时间,需要并发上传请求。同时,对其中个别出错的request,进行额外处理和再上传。

场景二,并发读取多种主数据时,其中个别request出错,应该只针对出错的request进行额外处理和再发送request。

为了满足这个需求,当使用forkJoin时,以下为一个实现代码:

forkJoin([
    of(1),
    of(2),
    of(4).pipe(map(() => { throw new Error('test'); }),catchError(err => { return of('retry ' + 4)})),
      of(3)
    ])
    .subscribe({
      next: val => {
        console.log(val);
      },
      error: err => {
        console.error(err);
      }
    });

得到的Console结果

[1, 2, "retry 4", 3]
0: 1
1: 2
2: "retry 4"
3: 3
length: 4

如果使用mergeAll来实现

of([
      1,
      2,
      () => { throw new Error('test'); },
      3
    ]).pipe(
      map(val => val),
      mergeAll(),
      catchError(() => { return of('error catched') })
    )
    .subscribe({
      next: val => {
        console.log(val);
      },
      error: err => {
        console.error(err);
      }
    });

是为之记。
Alva Chien
2020.03.28