RxJava:从Nullable对象创建可观察对象

问题描述:

我是RxJava的新手。我有这样的代码。我从repository.getStatus()返回的字符串中创建一个observable。如果它是空的,我必须继续使用proceed方法而不做任何事情。如果不为零,则必须拨打repository.init(),然后拨打proceed。这是我所做的。RxJava:从Nullable对象创建可观察对象

Flowable.just(repository.getStatus()) // getStatus return a string , which can be null 
     .doOnError(throwable -> proceed()) 
     .flatMapCompletable(s -> repository.init()) 
     .observeOn(Schedulers.io()) 
     .subscribeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new DisposableCompletableObserver() { 
         @Override 
         public void onComplete() { 
          proceed(); 
         } 

         @Override 
         public void onError(@NonNull Throwable e) { 
          handleErrors(t)); 
         } 
        }); 

private void proceed(){ 
    // 
} 

如果repository.getStatus()为空,不会调用它的doOnError(throwable -> proceed())? 现在它在repository.getStatus()为空时崩溃。处理这种用例的最佳Rx方式是什么?

+0

检查[这](https://*.com/questions/ 39781420/handling-null-in-rxjava2) – masp

+0

@masp,但正如接受的答案中所述,'Optional.ofNullable'需要上面的API 24。我的最低版本是19 – Jrd

+0

除了答案,还有一个[链接](https://github.com/ReactiveX/RxJava/issues/4644)在问题中的对话,你可以从中很好地理解。 – masp

相反管道repository.getStatus()repository.init()的,你可以只从任何这两个基础上,建立一个可观测你的状况。

尝试是这样的

Flowable.just(repository.getStatus() == null ? Completable.complete() : repository.init()) 
      .flatMapCompletable(c -> c) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new DisposableCompletableObserver() { 
       @Override 
       public void onComplete() { 
        proceed(); 
       } 

       @Override 
       public void onError(@NonNull Throwable e) { 
        handleErrors(t)); 
       } 
      }); 

你甚至可以让你repository.getStatus()作为一个可能做这样的事情

Maybe.create((MaybeOnSubscribe<String>) e -> { 
     if (repository.getStatus() != null) e.onSuccess(repository.getStatus()); 
     e.onComplete(); 
    }).flatMapCompletable(s -> Completable.fromAction(() -> repository.init())) 
      .subscribe(new DisposableCompletableObserver() { 
       @Override 
       public void onComplete() { 
        proceed(); 
       } 

       @Override 
       public void onError(@NonNull Throwable e) { 
        handleErrors(t)); 
       } 
      }); 

由于您使用的是Rx2,因此您可以使用新的Maybe类型,即类似Optional的流式传输。不幸的是,它没有一个fromNullable工厂方法(见https://github.com/ReactiveX/RxJava/issues/5019),但你可以这样做:

Maybe.fromCallable(() -> repository.getStatus())