Asynctask status &&取消等效于RxJava2 Observable?
问题描述:
我想学习RxJava2,并将我的AsyncTasks转换为Observable。Asynctask status &&取消等效于RxJava2 Observable?
我有下面这段代码,我试图转换。
if(asyncTask.getStatus() == AsyncTask.Status.RUNNING){
asyncTask.cancel();
}
asyncTask = new CustomTask();
asyncTask.execute(input);
我试图重新创建以下与一次性使用。
Disposable currentTask;
PublishSubject input = PublishSubject.create();
对于每一个输入
if(currentTask != null) currentTask.dispose();
currentTask = input
.map(// Network calls
// returns CustomObject)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
// do work with result
}, throwable -> Log.e(TAG, throwable.toString()));
然而,currentTask
总是空。为什么?这是做错了吗?
答
您正确使用Disposable
,但我只能假设你正在搞这个问题。 rx中的主题既可以是发布者也可以是订阅者......主题不一定要等到subscribe(...)
开始发布项目。因此,我不建议用任何形式的Subject
来替换您的AsyncTask
。
你可以得到类似的,更具有确定性的行为,这样做虽然:
Observable<CustomObject> networkObservable =
Observable.create(emitter ->
{
try {
CustomObject object = doNetworking();
emitter.onNext(object);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
);
if(currentTask != null) currentTask.dispose();
currentTask = networkObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// this next subscribe is similar to AsyncTask.execute() as it starts the stream
.subscribe(result -> {
// do work with result
}, throwable -> Log.e(TAG, throwable.toString()));
另外,还要考虑寻找到SerialDisposable
,你不必做那些空/处置检查
SerialDisposable serialDisposable = new SerialDisposable();
原子化:设置该容器上的下一个一次性物品,并处理前一个(如果有的话),或者如果容器已被处置。
Disposable disposable = networkObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
serialDisposable.set(disposable); // this will call dispose on the previous task if it exists
并非所有'subscribe(...)'方法都返回'Disposable'。你能否至少暗示你传递给'subscribe(...)'的参数? – Jon
你是否试图在任何时候清空'currentTask'?另外,你可以添加任何其他你使用'PublishSubject输入'的地方吗?我怀疑你可能会滥用这个主题,这是造成你的问题。 – Jon