concatMap可变数量的承诺和管道结果作为下一个承诺的参数
问题描述:
我有一个可变数量的承诺,我想依次将前一个承诺的结果作为下一个承诺的参数执行它们。目前,我设法Concat的,以便他们按顺序执行它们:concatMap可变数量的承诺和管道结果作为下一个承诺的参数
const promises = [p1, p2, p3, p4, ....];
const source$ = promises.map(p => Rx.Observable.defer(p));
const combination$ = Rx.Observable.concat(...source$);
combination.subscribe((x) => console.log(x));
但我怎么能现在设法管的参数到每个承诺?我读过我可以使用concatMap。类似的东西:
Observable.from(p1).concatMap(res => p2(res)).concatMap(res => p3(res))
我想我需要以某种方式总是返回一个新的可观察里面concatMap。并且如果一些promise应该是可变的,如何链接concatMaps?
有人可以请指出我在正确的方向吗?我对整个反应式编程的新东西都很陌生,但对于我已经了解的部分来说,这非常棒!
谢谢
答
玉家伙我终于有结果,我打算有
const promises = [
(i=0) => Promise.resolve(i+2),
(i=2) => Promise.resolve(i+4),
(i=2) => Promise.resolve(i+8)
];
const initial$ = Rx.Observable.from(promises[0]());
promises.reduce((o, p) => o.concatMap(p), initial$)
.subscribe(console.log);
答
看起来你可以使用expand()
操作员递归项目从内部可观察到发射的值。
比如这里我有观测量的数组,我想补充与当前值的累积值:
const arr = [1, 2, 3, 4, 5].map(v => Observable.of(v));
Observable.of(arr)
.concatMap(arr => {
return Observable
.concat(arr[0]) // get the first item
.expand((prev, i) => {
i = i + 1; // We skipped the first Observable in `arr`
if (i === arr.length) {
return Observable.empty();
} else {
return arr[i].map(result => prev + result)
}
});
})
.subscribe(console.log);
此打印:
1
3
6
10
15
或者你只能在最后得到如果您使用的值为takeLast(1)
:
return Observable
.concat(arr[0])
.expand((prev, i) => {
...
})
.takeLast(1);
或者有更简单的解决方案N使用mergeScan
,但我不认为这是很容易理解:
Observable.of(arr)
.concatAll()
.mergeScan((acc, observable) => {
return observable.map(result => result + acc);
}, 0)
.subscribe(console.log);
结果是一样的前面的示例所示。
感谢您的答复。我根据自己的需要调整了你的答案,但是它会抛出一个错误(arr [i]不是函数)http://jsbin.com/vocomebini/edit?js,console – vacetahanna
@vacetahanna你正在把'arr '用'.map(v => Rx.Observable.defer(v))将它变成一个Observable;'这就是它抛出错误的原因。当你调用'arr [i](prev)''arr [i]'不是一个数组时,它是一个Observable。 – martin
嗯,但是当我尝试使用.of或.from时,它也不起作用。我认为我可以执行承诺,直到明确的呼叫。 – vacetahanna