RxJava:观察到的,默认的线程
我有以下代码:RxJava:观察到的,默认的线程
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
而这里的输出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要的细节:我打电话从另一个线程subscribe
方法(main
线程在Android中)。
所以看起来像Observable
类是同步的,在默认情况下,它发出事件(s.onNext
)在同一个线程执行的一切(运营商如map
+通知用户),对不对?我想知道......它是有意的行为还是我误解了某些东西?其实我期望至少onNext
和onComplete
回调将在调用者的线程上被调用,而不是在发送事件的那个上。我是否正确理解在这种特殊情况下实际的调用者线程无关紧要?至少在异步生成事件时。
另一个值得关注的 - 如果我收到了一些可观察到从一些外部源的参数(即我不产生对我自己)......有没有办法对我来说它的用户检查其是否是同步或异步,我只需明确指定我想通过subscribeOn
和observeOn
方法接收回调的位置,对吧?
谢谢!
RxJava是unopinionated关于并发。如果您不使用observeOn/subscribeOn等任何其他mechanisem,它将在订阅线程上产生值。请不要在运营商中使用Thread等低级构造,否则可能会破坏合同。
由于使用线程时,onNext将从调用线程调用(“后台线程1”)。订阅发生在调用(UI-Thread)上。链中的每个运算符都将从'background-thread-1'调用线程调用。订阅onNext也将从'background-thread-1'被调用。
如果要产生不上调用线程使用的值:subscribeOn。如果你想把线程切换回主要用途observeOn链中的某个地方。最有可能在订阅它之前。
实施例:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
.subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite
.map(integer -> integer) // map happens on Computational-Threads
.observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
.subscribe(integer -> {
// called from mainThread
});
这里是一个很好explanitation。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html