如何链接RXJS中的承诺列表?
如何链接RXJS中的承诺列表?每一个承诺需要执行时,先前解决(工作todo是有状态的)。如何链接RXJS中的承诺列表?
我现在做它的方式原始的感觉:
const workTodo = []; // an array of work
const allWork = Observable.create(observer => {
const next=() => {
const currentTodo = workTodo.shift();
if (currentTodo) {
doTodoAsync(currentTodo)
.then(result => observer.onNext(result))
.then(next);
} else {
observer.onCompleted();
}
};
next();
});
我的想法是这样的:
const workTodo = []; // an array of work
const allWork = Observable
.fromArray(workTodo)
.flatMap(doTodoAsync);
但在一次基本上执行所有的承诺。
有些递归怎么样?
首先创建一个递归函数,并调用它recursiveDoToDo
:
const recursiveDoToDo = (currentTodo, index) =>
Observable
.fromPromise(doTodoAsync(currentTodo))
.map(resolved => ({resolved, index}));
上面简单的代码封装您doTodoAsync
到可观察到的,然后我们的结果映射到返回的解决承诺和的的index
数组,用于递归使用。
接下来,我们将使用.expand()
运算符递归调用recursiveDoToDo
。
recursiveDoToDo(worktodo[0], 0)
.expand(res => recursiveDoToDo(worktodo[res.index + 1], res.index + 1))
.take(worktodo.length)
所有你需要为你的递归做的只是通过1递增索引,因为.expand()
将递归一直运行下去,在.take()
运营商有没有告知观察到何时结束流,这是长你的worktodo
。
现在,你可以简单地订阅它:
recursion.subscribe(x => console.log(x));
看来你是非常接近你的尝试。
您既可以指定1最大并发数为.flatMap
,如:
Observable.fromArray(workTodo)
.flatMap(doTodoAsync, 1)
或等效采用.concatMap
代替.flatMap
:
Observable.fromArray(workTodo)
.concatMap(doTodoAsync)
我会用concatMap
,因为它感觉更地道。
UPDATE:DEMO
我测试过'concatMap',但它看起来有点不同。它并行执行'doTodoAsync',然后按照原始顺序组合结果。这里还描述:http://reactivex.io/documentation/operators/flatmap.html。 flatMap上的第二个参数作为'number'似乎不存在。 – nicojs
@nicojs不知道你到底是什么意思。用正在运行的演示更新答案。它清楚地表明,任务是一个接一个执行的,并且'mergeMap/flatMap'接受'concurrency'参数,如下所述:http://reactivex.io/rxjs/class/es6/Observable。JS〜Observable.html#实例方法,mergeMap。 –
看来工作,但它似乎也简陋。我想我更喜欢递归电话。我觉得应该有一个简单的操作员来做到这一点。 – nicojs