RxJava:热观测量的消费者

RxJava:热观测量的消费者

问题描述:

我试图去与RxJava错综复杂的抓地力,但打了一个新手的问​​题:RxJava:热观测量的消费者

我试图从一冷一营造热销观察到的,并订阅两位消费者哪些进程以不同的速度推送事件。这里是一个代码片段:

ConnectableObservable<Long> ob = Observable.interval(200, TimeUnit.MILLISECONDS) 
    .publish(); 
ob.connect();  

Consumer<Long> withSleep = (Long t) -> { 
    System.out.println("Second : " + t); 
    sleep(1); 
}; 

Consumer<Long> noSleep = (Long t) -> { 
    System.out.println("First : " + t); 
}; 

sleep(2); 

ob.observeOn(Schedulers.newThread()).subscribe(noSleep); 

ob.observeOn(Schedulers.newThread()).subscribe(withSleep); 

sleep(5); 

sleep(2)只是为了看看observable是否已经开始发射。事实上,这个打印最初如预期的那样。

  1. 第一:10
  2. 第二:10
  3. 第一:11
  4. 第一:12
  5. 第一:13
  6. 第一:14
  7. 第二:11
  8. 第一个:15
  9. 第一:16
  10. 第一:17

但随后的第二消费者(具有较长的处理时间,由1秒睡眠模拟的),拾起在序列的情况下(输出线7),而不是目前的事件(没有。 14),正如我期望从一个热门的可观察者那样。不管是热门的观察者,不管用户,还是订阅者在选择什么时候推送什么(假设没有特定的,明确的背压策略),都只是继续开火?

我需要改变什么才能让第二个消费者简单地拿起当前生成的任何东西(即在上例中显示14而不是11)?

任何帮助将不胜感激。

热或冷,运营商如发布和观察在订阅后保持连续,因此即使处理时间超过排放速率,您也将获得所有事件。

为了避免在第二种情况下处理旧的条目,你必须连锁经营是降事件,并且不缓存太多之一:

ob.toFlowable(BackpressureStrategy.DROP) 
    .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread()) 
    .rebatchRequests(1) 
    .subscribe(withSleep); 

ob.toFlowable(BackpressureStrategy.LATEST) 
    .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread()) 
    .rebatchRequests(1) 
    .subscribe(withSleep); 
+0

谢谢,这样做的招。但我不明白为什么需要延迟操作符(没有它,当然不起作用)。我认为这是为了一些需要的副作用(缓冲区,线程?),但它看起来有点做作,而不是我期望从这个操作符。另外,似乎删除延迟运算符中的调度程序会产生相同的结果,或者我错过了什么? –

+0

延迟不会缓冲或干扰背压请求,因此不会像'observeOn'那样预取数据,让流实际上跳过处理项目时发出的数据。 – akarnokd