Rxjs缓冲区执行反压
问题描述:
RxJS新手问题即将到来! 所以我有这个基本的缓冲区,它将source1和source2中的所有内容附加到数组中。在某些情况下,缓冲区被清除。Rxjs缓冲区执行反压
var buffer = Rx.Observable.merge(source1, source2).scan(
function (arr, item) {
if (!magic) {
return arr.push(item);
}
else {
return [item]; //Clear the buffer from previous items
}
}, []);
我也希望有一个“消费者”的缓冲区,从缓冲区转移项目,并与他们做事情。我如何实现这一点,并确保消费者更新缓冲区可观察性?
编辑:我想将数据输入到SourceBuffer,但只允许在数据不更新时附加数据。这让我感到背压情况。所以我尝试创建一个controlled observable,但无法弄清楚如何使用自己的缓冲创建自己的版本。
答
所以我有这个基本的缓冲区,它将源码1和源码2中的所有内容附加到数组中。在某些情况下,缓冲区被清除。
你需要做的是:
var sourceStream = Rx.Observable.merge(soruce1, source2);
var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3);
// someOperator is where you perform the "magic"
var subscribeToThisStream = sourceStream.buffer(boundary);
// emits all items collected in the buffer between two boundary emitions
我也希望有一个缓冲区,从缓冲区他们转移项目和做事的“消费者”。我如何实现这一点,并确保消费者更新缓冲区可观察性?
如果你想做到这一点的用户的手段,这是绝对不可取的,如果你想要做的事情在Rx方式在某些情况下甚至是不可能的。
请在问题中添加一些细节。 – tMJ