如何在RxJava2中使用带有lambda表达式的DisposableObserver
问题描述:
我的用例是想在我的onNext中的特定条件之后进行处理。所以试图使用DisposableObserver。这是工作如何在RxJava2中使用带有lambda表达式的DisposableObserver
Observable.just(1, 2, 3, 4)
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext() received: " + integer);
if (integer == 2) {
dispose();
}
}
@Override
public void onError(Throwable e) { System.out.println("onError()"); }
@Override
public void onComplete() { System.out.println("onComplete()"); }
}
);
现在,如果你尝试用lambda来代替这一点,对待lambda作为
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
这样做是为了现在这样的代码。通过从onSubscribe保存一次性,然后调用disposable.dispose();从onNext。
private Disposable disposable;
private void disposableObserverTest() {
Observable.just(1, 2, 3, 4)
.subscribe(integer -> {
System.out.println("onNext() received: " + integer);
if (integer == 2) {
disposable.dispose();
}
}, throwable -> System.out.println("error"),
() -> System.out.println("complete"),
disposable1 -> {
this.disposable = disposable1;
});
}
但是,如果您想直接调用dispose()如何使用lambda表达式?
答
这是因为在第一种情况下,你叫
subscribe(DisposableObserver observer)
而在第二种情况下,你叫
subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete)
这意味着在第二种情况下,你不抱参考DisposableObserver
,因此你不能打电话给dispose()
。
+0
确切的问题。所以我想在第二种情况下使用带有lambda表达式的DisposableObserver。 – bpr10
答
您可以使用takeUntil关闭观测值。
@Test
public void takeUntil() throws Exception {
Observable.just(1, 2, 3, 4)
.takeUntil(integer -> integer == 2)
.test()
.assertValues(1, 2);
}
“这给出了一个错误” - 什么样的? – azizbekian
错误是它不会查找dispose(),因为它将lambda视为 ConsumeronNext,Consumer onError, Action onComplete。 但我希望lambda成为DisposableObserver。 – bpr10