如何使用rxjava2取消Observable.timer?

问题描述:

我在我的代码重试机制,我用下面的行执行我重试逻辑。例如,我生成一个随机毫秒来延迟执行。当计时器滴答到30 * 1000毫秒时,我想取消这个计时器。我如何取消这个定时器并立即执行我的逻辑。如何使用rxjava2取消Observable.timer?

//register retryWhen 
Observable.retryWhen(new RetryWhenException()); 

//retry code. 
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{  
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception { 
      return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() { 
       @Override 
       public Wrapper apply(Throwable throwable, Integer integer) throws Exception { 
        return new Wrapper(throwable, integer); 
       } 
      }).flatMap(new Function<Wrapper, Observable<?>>() { 
       @Override 
       public Observable<?> apply(Wrapper wrapper) throws Exception { 
        long delay = 60 * 1000; 
        //How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back? 
        return Observable.timer(delay, TimeUnit.MILLISECONDS); 
       } 
      }); 
     } 
} 

在此先感谢。

如果我正确理解你的问题,你要能够多个条件结合起来,重试,这意味着重试最多(定时器),一定时间后会发生,甚至更早一些事件后会发生(网络连接的)。
在这种情况下,你需要这两个事件结合起来,首先,你需要一些Observable通知有关的网络事件(这是一个不同的讨论如何创建它,不应该是一个问题来包装与可观测系统广播事件) ,那么你可以做这样的事情:

private Observable<NetworkState> networkStateObservable; 

public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> { 
    public Observable<?> apply(
      final Observable<? extends Throwable> observable) throws Exception { 
     return observable.zipWith(Observable.range(1, count + 1), Wrapper::new) 
       .flatMap(wrapper -> { 
        long delay = 60 * 1000; 
        Observable<NetworkState> networkConnectedEvents = 
          networkStateObservable.filter(networkState -> networkState.isConnected()) 
           take(1); 
        Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS); 

        return Observable.amb(Arrays.asList(networkConnectedEvents, timer)); 
       }); 
    } 
} 

网络状态Observable过滤连接时,只得到通知,也take(1)是确保第一时间之后我们会得到通知将取消订阅(没有进一步需要听)。
amb()运营商似乎是确切的适合这里,因为它会选择Observable会发出冷杉,并且会取消其他的,万一这意味着网络Observable发出定时器之前,定时器Observable将退订(=定时器将被取消)。

编辑:

删除错误takeUntil(networkConnectedEvents),它并不需要作为AMB如果需要的话会做退订。

+0

yosriz嗨, 请您帮忙看看我应该如何正确地发出networkStateObservable?我在下面发布了我的代码。非常感谢。 – user3034559

感谢这么多的答复。这正是我所期待的。我创建了一个网络可观察包装如你所描述的, 似乎networkStateObservable onNetworkChanged后没有发出()被调用(state.isConnected()为true)。 如何让networkStateObservable正确发射?非常感谢。 我叫下面的线

e.onNext(state); 
e.onComplete(); 

是我错了发射networkStateObservable?

Observable<NetworkState> networkStateObservable = getNetworkObservableWrapper(); 
public Observable<NetworkState> getNetworkObservableWrapper() { 
     return Observable.create(new ObservableOnSubscribe<NetworkState>() { 
      @Override 
      public void subscribe(final ObservableEmitter<NetworkState> e) throws Exception { 
       final NetworkChangedListener listener = new NetworkChangedListener() { 
        @Override 
        public void onNetworkChanged(NetworkState state) { 
         //Below lines were called, but Observable.amb(Arrays.asList(networkConnectedEvents, timer)); didn't work.      
         e.onNext(state); 
         e.onComplete(); 
        } 
       }; 
       registerNetworkChanged(listener); 
      } 
     }); 
    } 
+0

它似乎是由.takeUntil(networkConnectedEvents)引起的。如果我删除了这一行,它工作正常。 – user3034559

+0

是啊,其实我把它错误,我从答案删除它,(如果需要的话,从计时器退订)安博会做取消 – yosriz

+0

猜测的问题:如果你的'registerNetworkChanged()'覆盖现有的网络改变了听众,什么可能发生是这个Observable订阅了两次(一个在amb并且在takeUntil时),因此第二个takeUntil可能会覆盖现有的侦听器。 – yosriz