Asynctask status &&取消等效于RxJava2 Observable?

Asynctask status &&取消等效于RxJava2 Observable?

问题描述:

我想学习RxJava2,并将我的AsyncTasks转换为Observable。Asynctask status &&取消等效于RxJava2 Observable?

我有下面这段代码,我试图转换。

if(asyncTask.getStatus() == AsyncTask.Status.RUNNING){ 
    asyncTask.cancel(); 
} 
asyncTask = new CustomTask(); 
asyncTask.execute(input); 

我试图重新创建以下与一次性使用。

Disposable currentTask; 
PublishSubject input = PublishSubject.create(); 

对于每一个输入

if(currentTask != null) currentTask.dispose(); 

currentTask = input 
    .map(// Network calls 
     // returns CustomObject) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(result -> { 
       // do work with result 
       }, throwable -> Log.e(TAG, throwable.toString())); 

然而,currentTask总是。为什么?这是做错了吗?

+0

并非所有'subscribe(...)'方法都返回'Disposable'。你能否至少暗示你传递给'subscribe(...)'的参数? – Jon

+0

你是否试图在任何时候清空'currentTask'?另外,你可以添加任何其他你使用'PublishSubject输入'的地方吗?我怀疑你可能会滥用这个主题,这是造成你的问题。 – Jon

您正确使用Disposable,但我只能假设你正在搞这个问题。 rx中的主题既可以是发布者也可以是订阅者......主题不一定要等到subscribe(...)开始发布项目。因此,我不建议用任何形式的Subject来替换您的AsyncTask

你可以得到类似的,更具有确定性的行为,这样做虽然:

Observable<CustomObject> networkObservable = 
      Observable.create(emitter -> 
        { 
         try { 
          CustomObject object = doNetworking(); 
          emitter.onNext(object); 
          emitter.onComplete(); 
         } catch (Exception e) { 
          emitter.onError(e); 
         } 
        } 
      ); 

if(currentTask != null) currentTask.dispose(); 

currentTask = networkObservable.subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     // this next subscribe is similar to AsyncTask.execute() as it starts the stream 
     .subscribe(result -> { 
      // do work with result 
      }, throwable -> Log.e(TAG, throwable.toString())); 

另外,还要考虑寻找到SerialDisposable,你不必做那些空/处置检查

SerialDisposable serialDisposable = new SerialDisposable(); 

原子化:设置该容器上的下一个一次性物品,并处理前一个(如果有的话),或者如果容器已被处置。

Disposable disposable = networkObservable.subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(...); 

serialDisposable.set(disposable); // this will call dispose on the previous task if it exists