如何在RxJava2中使用带有lambda表达式的DisposableObserver

问题描述:

我的用例是想在我的onNext中的特定条件之后进行处理。所以试图使用DisposableObserver。这是工作如何在RxJava2中使用带有lambda表达式的DisposableObserver

Observable.just(1, 2, 3, 4) 
    .subscribe(new DisposableObserver<Integer>() { 
        @Override 
        public void onNext(Integer integer) { 
         System.out.println("onNext() received: " + integer); 
         if (integer == 2) { 
         dispose(); 
         } 
        } 
        @Override 
        public void onError(Throwable e) { System.out.println("onError()"); } 
        @Override 
        public void onComplete() { System.out.println("onComplete()"); } 
        } 
    ); 

现在,如果你尝试用lambda来代替这一点,对待lambda作为

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete) 

这样做是为了现在这样的代码。通过从onSubscribe保存一次性,然后调用disposable.dispose();从onNext。

private Disposable disposable; 
    private void disposableObserverTest() { 
    Observable.just(1, 2, 3, 4) 
     .subscribe(integer -> { 
       System.out.println("onNext() received: " + integer); 
       if (integer == 2) { 
       disposable.dispose(); 
       } 

      }, throwable -> System.out.println("error"), 
      () -> System.out.println("complete"), 
      disposable1 -> { 
       this.disposable = disposable1; 
      }); 
    } 

但是,如果您想直接调用dispose()如何使用lambda表达式?

+1

“这给出了一个错误” - 什么样的? – azizbekian

+0

错误是它不会查找dispose(),因为它将lambda视为 ConsumeronNext,Consumer onError, Action onComplete。 但我希望lambda成为DisposableObserver。 – bpr10

这是因为在第一种情况下,你叫

subscribe(DisposableObserver observer) 

而在第二种情况下,你叫

subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete) 

这意味着在第二种情况下,你不抱参考DisposableObserver,因此你不能打电话给dispose()

+0

确切的问题。所以我想在第二种情况下使用带有lambda表达式的DisposableObserver。 – bpr10

您可以使用takeUntil关闭观测值。

@Test 
public void takeUntil() throws Exception { 
    Observable.just(1, 2, 3, 4) 
      .takeUntil(integer -> integer == 2) 
      .test() 
      .assertValues(1, 2); 
}