RxJava1.x中的subscribeOn,observeOn到底做了些什么
注:文中的OnSubscribe1,OnSubscribe2,Observable1等等命名是通过出现的时序来命名的,越大说明越晚出现
我们先来举个例子吧:
Observable.create(object : Observable.OnSubscribe<String> {
override fun call(it: Subscriber<in String>?) {
Timber.i("OnSubscribe call: ${Thread.currentThread().name}")
it?.onNext("onNextString")
}
})
//Schedulers.newThread()其实是NewThreadScheduler,具体就不分析了,看源码很容易找到
.subscribeOn(Schedulers.newThread())
//LooperScheduler
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Action1<Any?> {
override fun call(t: Any?) {
Timber.i(" onNext ${Thread.currentThread().name} ${t?.toString()?: "null"}")
}
})
结果如下:
OnSubscribe call: RxNewThreadScheduler-2
onNext main onNextString
总结:
- 对于subscribeOn,,每个subscribeOn都会引用上个onSubscribe,它只是改变了第一个subscribeOn上面的OnSubscribe的线程,subscribeOn方法会新建一个Observable2,同时OnSubscribe2赋值为OperatorSubscribeOn(Observable,Scheduler),OperatorSubscribeOn里面的Scheduler切换线程,让Observable执行subscribe
- 对于observeOn,新建一个Observable3对象,同时OnSubscribe3赋值为OnSubscribeLift,OnSubscribeLift处理subscribe时候,先让operator封装subscribe成ObserveOnSubscriber,再让上一个OnSubscribe去处理ObserveOnSubscriber,关键一点是observeOn切换线程是在ObserveOnSubscriber的onNext,onError等方法切换的
下面我们来详细的看看整个流程,从上面的例子来看显然OnSubscribe 的call方法执行在子线程中,而subscriber接受到消息是主线程执行的。现在我们来一步步分析其原理,create方法前面就已经分析过了,我们直接看subscribeOn:
Observable.java:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//我们写的demo是用create方法,create是直接new Observable的,那显然不是ScalarSynchronousObservable
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//这里又采用了create方法,说明这里又新建了一个Observable对象
//暂时我们把create新建立的对象为ob2,前面的Observable为ob1,ob2的OnSubscribe2为OperatorSubscribeOn
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
OperatorSubscribeOn.java:
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
//NewThreadScheduler
this.scheduler = scheduler;
//这个source为ob1
this.source = source;
}
来看看这部分的流程图:
接着我们看看observeOn方法:
Observable.java:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//同理,this不是ScalarSynchronousObservable
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//这里又有个create方法,把它作为ob3,其属性onSubscribe3为OnSubscribeLift对象
//参数onSubscribe是ob2的onSubscribe2
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
OnSubscribeLift.java:
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
// ob2的onSubscribe,也就是OperatorSubscribeOn类对象
this.parent = parent;
//OperatorObserveOn
this.operator = operator;
}
最后我们来看看Observable.subscribe(Subscriber)这个方法,前面我们就知道这个方法其实是让当前的Observable的onSubscribe执行call方法,并且把Subscriber当做参数,当执行到subscribe,当前的Observable已经是ob3了,其onSubscribe也已经是onSubscribe3(OnSubscribeLift)对象,我们来看看OnSubscribeLift的call方法:
OnSubscribeLift.java:
public void call(Subscriber<? super R> o) {
try {
//RxJavaHooks.onObservableLift(operator)这个的结果也就是operator
//operator在上面也说过是OperatorObserveOn对象
//st其实是个ObserveOnSubscriber,通过下面的代码分析可以知道
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// 这里有个地方,是封装后的st的onStart方法
st.onStart();
//这个parent是ob2的onSubscribe,也就是OperatorSubscribeOn类对象
//这点就很关键了,重新封装好的Subscriber又传给了上层ob2的onSubscribe
parent.call(st);
} catch (Throwable e) {
....
st.onError(e);
}
} catch (Throwable e) {
......
o.onError(e);
}
}
来看看OperatorObserveOn的call方法,它对初始的Subscriber进行了封装:
OperatorObserveOn.java:
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//这个是AndroidSchedulers.mainThread()---LooperScheduler
this.scheduler = scheduler;
//false
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
....
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
//封装成ObserveOnSubscriber
return parent;
....
}
接着我们看ob2的onSubscribe2,也就是OperatorSubscribeOn类对象的call方法,ObserveOnSubscriber作为参数传进去的
OperatorSubscribeOn.java(onSubscribe2):
//subscriber应该是subscriber2,即ObserveOnSubscriber
public void call(final Subscriber<? super T> subscriber) {
//在OperatorSubscribeOn构造函数我们可以知道scheduler就是一开始Schedulers.newThread-》NewThreadScheduler,这个inner是NewThreadWorker
final Worker inner = scheduler.createWorker();
//subscriber就是ObserveOnSubscriber类对象
subscriber.add(inner);
//
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
.....
//这个source在构造函数中初始化的,是ob1的引用,
//里面具体的实现其实就是ob1的onSubscribe1去调用call方法,s作为参数
source.unsafeSubscribe(s);
}
});
}
我们来看NewThreadWorker.schedule方法:
NewThreadWorker.java:
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
//返回的就是action
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
//ScheduledAction继承Runnable
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
//executor是ScheduledThreadPoolExecutor,在NewThreadWorker构造函数中就初始化了
//这里其实也就执行了ScheduledAction的run方法
//也就是action.call,这里就切换了线程,OnSubscribe1的call方法是执行在这个线程池中的
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
到这里整个流程好像就结束了,总觉得有点怪怪的,好像observeOn切换线程没起作用,仔细想想,传入到OnSubscribe2的是ObserveOnSubscriber里面具体方法还没看,像onNext,onCompleted…都是执行它的,那我们来看看ObserveOnSubscriber.onNext方法把:
ObserveOnSubscriber.java:
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
//recursiveScheduler在构造函数中初始化的,就是LooperScheduler.createWorker
recursiveScheduler.schedule(this);
}
}
LooperScheduler&HandlerWorker.java:
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
//ScheduledAction是Runnable
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
//这个消息会被ScheduledAction的run方法接受到
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
ScheduledAction.java:
public void run() {
//这个action就是ObserveOnSubscriber,ObserveOnSubscriber继承了Action0
action.call();
}
ObserveOnSubscriber.java:
// only execute this from schedule()
//这个call方法已经改变线程了
public void call() {
.....
//child就是作为参数传进来的Subscriber
final Subscriber<? super T> localChild = this.child;
localChild.onNext(localOn.getValue(v));
.....
}
通过LooperScheduler.schedule的具体实现可以看出,也就是通过Handler进行线程切换的,即ObserveOnSubscriber.onNext工作在主线程
那么现在就有个问题,如果多次调用subscribeOn,observeOn比如说连续我两次调用subscribeOn或者连续两次调用observeOn,能不能切换线程:
Observable.create(object : Observable.OnSubscribe<String> {
override fun call(it: Subscriber<in String>?) {
Timber.i("OnSubscribe call: ${Thread.currentThread().name}")
it?.onNext("onNextString")
}
})
.subscribeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(object : Action1<Any?> {
override fun call(t: Any?) {
Timber.i(" onNext ${Thread.currentThread().name} ${t?.toString()?: "null"}")
}
})
结果:
OnSubscribe call: RxNewThreadScheduler-1
onNext RxNewThreadScheduler-1 onNextString
这里例子表明后面subscribeOn并没有改变执行的线程,我们来回顾下subscribeOn:
Observable.java:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//我们知道,传入的subscribe还是会被OperatorSubscribeOn的call处理一次的
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
OperatorSubscribeOn.java:
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//这里是切换线程
inner.schedule(new Action0() {
,.....
source.unsafeSubscribe(s);
}
});
}
什么意思呢,不管第一个subscribeOn后面执行的是在哪个线程,都是会经过第一个subscribeOn处理,然后切换线程。
接着我们来看看多个observeOn处理的情况:
Observable.create(object : Observable.OnSubscribe<String> {
override fun call(it: Subscriber<in String>?) {
Timber.i("OnSubscribe call: ${Thread.currentThread().name}")
it?.onNext("onNextString")
}
})
.observeOn(Schedulers.newThread())
.map{
Timber.i("map : ${Thread.currentThread().name}")
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Action1<Any?> {
override fun call(t: Any?) {
Timber.i(" onNext ${Thread.currentThread().name} ${t?.toString()?: "null"}")
}
})
结果:
OnSubscribe call: main
map: RxNewThreadScheduler-1
onNext main onNextString
我们在上面的分析中可以知道,observeOn方法它改变线程的关键点在于封装的ObserveOnSubscriber里面具体的onNext等方法。
我们来回顾下ObserveOnSubscriber的onNext的代码:
public void call() {
.....
//child就是作为参数传进来的Subscriber
final Subscriber<? super T> localChild = this.child;
localChild.onNext(localOn.getValue(v));
.....
}
根据上面那个例子来看,这里的loacalChild应该就是onSubscriberMap这个实例对象了,通过上文的map分析知道,onNext方法是交给了Func的call方法执行,然后再交给前一个onSubscriber处理,这前一个onSubscriber又是ObserveOnSubscriber对象,又切换线程。这里的意思就是第一个observeOn改变的事map中func的线程,第二个改变的是最后一个subscribe的线程。