如何使用rxjava2取消Observable.timer?
我在我的代码重试机制,我用下面的行执行我重试逻辑。例如,我生成一个随机毫秒来延迟执行。当计时器滴答到30 * 1000毫秒时,我想取消这个计时器。我如何取消这个定时器并立即执行我的逻辑。如何使用rxjava2取消Observable.timer?
//register retryWhen
Observable.retryWhen(new RetryWhenException());
//retry code.
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(new Function<Wrapper, Observable<?>>() {
@Override
public Observable<?> apply(Wrapper wrapper) throws Exception {
long delay = 60 * 1000;
//How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back?
return Observable.timer(delay, TimeUnit.MILLISECONDS);
}
});
}
}
在此先感谢。
如果我正确理解你的问题,你要能够多个条件结合起来,重试,这意味着重试最多(定时器),一定时间后会发生,甚至更早一些事件后会发生(网络连接的)。
在这种情况下,你需要这两个事件结合起来,首先,你需要一些Observable
通知有关的网络事件(这是一个不同的讨论如何创建它,不应该是一个问题来包装与可观测系统广播事件) ,那么你可以做这样的事情:
private Observable<NetworkState> networkStateObservable;
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> {
public Observable<?> apply(
final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), Wrapper::new)
.flatMap(wrapper -> {
long delay = 60 * 1000;
Observable<NetworkState> networkConnectedEvents =
networkStateObservable.filter(networkState -> networkState.isConnected())
take(1);
Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS);
return Observable.amb(Arrays.asList(networkConnectedEvents, timer));
});
}
}
网络状态Observable
过滤连接时,只得到通知,也take(1)
是确保第一时间之后我们会得到通知将取消订阅(没有进一步需要听)。 amb()
运营商似乎是确切的适合这里,因为它会选择Observable
会发出冷杉,并且会取消其他的,万一这意味着网络Observable
发出定时器之前,定时器Observable
将退订(=定时器将被取消)。
编辑:
删除错误takeUntil(networkConnectedEvents),它并不需要作为AMB如果需要的话会做退订。
感谢这么多的答复。这正是我所期待的。我创建了一个网络可观察包装如你所描述的, 似乎networkStateObservable onNetworkChanged后没有发出()被调用(state.isConnected()为true)。 如何让networkStateObservable正确发射?非常感谢。 我叫下面的线
e.onNext(state);
e.onComplete();
是我错了发射networkStateObservable?
Observable<NetworkState> networkStateObservable = getNetworkObservableWrapper();
public Observable<NetworkState> getNetworkObservableWrapper() {
return Observable.create(new ObservableOnSubscribe<NetworkState>() {
@Override
public void subscribe(final ObservableEmitter<NetworkState> e) throws Exception {
final NetworkChangedListener listener = new NetworkChangedListener() {
@Override
public void onNetworkChanged(NetworkState state) {
//Below lines were called, but Observable.amb(Arrays.asList(networkConnectedEvents, timer)); didn't work.
e.onNext(state);
e.onComplete();
}
};
registerNetworkChanged(listener);
}
});
}
它似乎是由.takeUntil(networkConnectedEvents)引起的。如果我删除了这一行,它工作正常。 – user3034559
是啊,其实我把它错误,我从答案删除它,(如果需要的话,从计时器退订)安博会做取消 – yosriz
猜测的问题:如果你的'registerNetworkChanged()'覆盖现有的网络改变了听众,什么可能发生是这个Observable订阅了两次(一个在amb并且在takeUntil时),因此第二个takeUntil可能会覆盖现有的侦听器。 – yosriz
yosriz嗨, 请您帮忙看看我应该如何正确地发出networkStateObservable?我在下面发布了我的代码。非常感谢。 – user3034559