如何链接RXJS中的承诺列表?

如何链接RXJS中的承诺列表?

问题描述:

如何链接RXJS中的承诺列表?每一个承诺需要执行时,先前解决(工作todo是有状态的)。如何链接RXJS中的承诺列表?

我现在做它的方式原始的感觉:

const workTodo = []; // an array of work 
const allWork = Observable.create(observer => { 
    const next=() => { 
    const currentTodo = workTodo.shift(); 
    if (currentTodo) { 
     doTodoAsync(currentTodo) 
     .then(result => observer.onNext(result)) 
     .then(next); 
    } else { 
     observer.onCompleted(); 
    } 
    }; 
    next(); 
}); 

我的想法是这样的:

const workTodo = []; // an array of work 
const allWork = Observable 
        .fromArray(workTodo) 
        .flatMap(doTodoAsync); 

但在一次基本上执行所有的承诺。

有些递归怎么样?

首先创建一个递归函数,并调用它recursiveDoToDo

const recursiveDoToDo = (currentTodo, index) => 
    Observable 
     .fromPromise(doTodoAsync(currentTodo)) 
     .map(resolved => ({resolved, index})); 

上面简单的代码封装您doTodoAsync到可观察到的,然后我们的结果映射到返回的解决承诺的的index数组,用于递归使用。

接下来,我们将使用.expand()运算符递归调用recursiveDoToDo

recursiveDoToDo(worktodo[0], 0) 
    .expand(res => recursiveDoToDo(worktodo[res.index + 1], res.index + 1)) 
    .take(worktodo.length) 

所有你需要为你的递归做的只是通过1递增索引,因为.expand()将递归一直运行下去,在.take()运营商有没有告知观察到何时结束流,这是长你的worktodo

现在,你可以简单地订阅它:

recursion.subscribe(x => console.log(x)); 

这里是working JS Bin

+0

看来工作,但它似乎也简陋。我想我更喜欢递归电话。我觉得应该有一个简单的操作员来做到这一点。 – nicojs

看来你是非常接近你的尝试。

您既可以指定1最大并发数为.flatMap,如:

Observable.fromArray(workTodo) 
.flatMap(doTodoAsync, 1) 

或等效采用.concatMap代替.flatMap

Observable.fromArray(workTodo) 
.concatMap(doTodoAsync) 

我会用concatMap,因为它感觉更地道。

UPDATE:DEMO

+0

我测试过'concatMap',但它看起来有点不同。它并行执行'doTodoAsync',然后按照原始顺序组合结果。这里还描述:http://reactivex.io/documentation/operators/flatmap.html。 flatMap上的第二个参数作为'number'似乎不存在。 – nicojs

+0

@nicojs不知道你到底是什么意思。用正在运行的演示更新答案。它清楚地表明,任务是一个接一个执行的,并且'mergeMap/flatMap'接受'concurrency'参数,如下所述:http://reactivex.io/rxjs/class/es6/Observable。JS〜Observable.html#实例方法,mergeMap。 –