RxJava2可观测背压
最近我意识到我不明白RxJava2
背压是如何工作的。RxJava2可观测背压
我做了小测试,我希望它应该会失败,MissingBackpressureException
例外:
@Test
public void testBackpressureWillFail() {
Observable.<Integer>create(e -> {
for (int i = 0; i < 10000; i++) {
System.out.println("Emit: " + i);
e.onNext(i);
}
e.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(i -> {
Thread.sleep(100);
System.out.println("Processed:" + i);
})
.blockingSubscribe();
}
系统显示出未来:
Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000
Processed:0
Processed:1
Processed:2
...
Processed:10000
为什么它不产生MissingBackpressureException
。
我希望e.onNext(i);
将把项目进入缓冲区的ObservableObserveOn
和之后的尺寸大于static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());
应该抛出MissingBackpressureException
这不会发生。缓冲区是否自动增长?如果没有物品存储在哪里?
这是因为背压仅在RxJava2中移出Flowable
,请参阅here。
如果您将切换到Flowable
与BackpressureStrategy.MISSING
你会得到例外。
这也意味着,在你的情况,你确实有缓冲可以自动增长, 从observerOn
文档:
修改的ObservableSource用于在指定调度执行其排放量和通知,异步与无限的缓冲。 ..
谢谢,请你解释一下,如果缓冲区是*的,Observable中的Observable
但我认为,根据文档,这是缓冲区增量步骤的“岛”的可配置大小。 – yosriz
'Observable's在RxJava2不支持背压,只有'Flowable's做 –
我知道,他们不支持backpreassure,但我认为不支持意味着MissingBackpressureException会抛出,不是aut o增长的缓冲区。 –