如何从可观察到的停止间隔

问题描述:

我是新的rxjava,我想每2秒执行一次轮询任务50次,也可能会终止,如果某些条件在任务中遇到,我试图使用Observable.interval但我发现除了抛出异常,没有办法终止它,有没有其他操作符可以实现我的目标? 顺便说一句,这个功能作为API来提供可观察的对象,所以我无法通过未加注释来控制订阅者和终止。如何从可观察到的停止间隔

Observable.interval(timeout, interval, TimeUnit.SECONDS) 
.flatmap(task - > task) 

我猜Observable.takeUntil(stopPredicate)Observable.takeWhile(predicate)可以帮助你:

Observable.interval(timeout, interval, TimeUnit.SECONDS) 
.takeWhile(val -> val < 42) 

这里观察到的将终止在第42尝试

+0

谢谢!这是我想要的。 – iammini

您可以将Observable.interval使用takeUntil如下停止:

Observable.interval(0, 1, TimeUnit.SECONDS) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .takeUntil(new Predicate<Long>() { 
       @Override 
       public boolean test(Long aLong) throws Exception { 
        return aLong == 10; 
       } 
      }) 
      .subscribe(new Consumer<Long>() { 
       @Override 
       public void accept(Long aLong) throws Exception { 
        Log.d(TAG, "Tick: " + aLong); 
       } 
      }); 

在这个例子中, 10秒后,可观察到的将停止。