图解RxJava2(三)
点击上方蓝字关注公众号
码个蛋第250次推文
今天有肯德基
作者:HuYounger
博客:http://rkhcy.github.io/
文章目录
概述
例子
源码分析
多次observeOn
0
概述
上篇文章只分析了 RxJava 中 subscribeOn 方法的实现原理,然而只使用 subscribeOn 发现上下游都是执行在子线程中。在日常开发中往往是将上游耗时任务通过 subscribeOn 指定在子线程中,下游通常是更新 UI 等需要在主线程中进行,使用 observeOn(AndroidSchedulers.mainThread()) 就能实现,那么它是怎么做到的呢?
1
例子
基于上篇文章的代码,修改上下游联系,添加 observeOn(AndroidSchedulers.mainThread())
打印如下
此时主线程中的「其他任务」没有被阻塞,上游的耗时任务在子线程 RxNewThreadScheduler-1 中执行,而下游接收任务在主线程中进行,并且事件传递不保证顺序(多次执行输出可能都不同),这也是多线程执行顺序的不确定性特点,上篇已介绍过。
2
源码分析
有前两篇分析经历,现在就轻车熟路:执行 Observable.create 、 new Observer 、Schedulers.newThread()、
subscribeOn(Scheduler) 后此时主线程应该是下面的样子
AndroidSchedulers.mainThread()
AndroidSchedulers 是 RxAndroid 中提供的,使用前需要在 Android Studio 中添加依赖。mainThread() 最后会创建 HandlerScheduler:
HandlerScheduler 也是 Scheduler 的子类,在初始化 HandlerScheduler 的时候创建了一个持有主线程 Looper 的 Handler ,可以猜想后面线程切换很有可能就是 Handler 机制的那一套。此时的主线程
observeOn(Scheduler scheduler)
该方法返回 Observable ,创建了 ObservableObserveOn(已经习惯了,就这几个英文单词排列组合),它也是 Observable 的子类,结合我们举的例子,就给它起名肯德基,肯德基持有黄焖鸡饭店的引用,初始化如下:
此时的主线程
subscribe(Observer observer)
由上两篇分析可知,这里会先去执行 ObservableObserveOn(肯德基) 的 subscribeActual(observer) 方法,这里的 observer 是顾客小明
步骤① 和上篇一样,这里也会创建 Worker,具体实现在 HandlerScheduler 中
并把之前持有主线程 Looper 的 Handler 传进去。
步骤② 先创建了 ObserveOnObserver(总起这种很操蛋的名字),作为
ObservableObserveOn(肯德基)的内部类,它是 BasicIntQueueDisposable 的子类(保证原子性、拥有操作队列功能、保证一次性操作),实现Observer接口(也是个顾客)。结合例子,就给它起名叫顾客小强,只是这个小强功能比较强大,小强持有小明的引用。
接着执行 ObservableSubscribeOn(黄焖鸡饭店)的 subscribe ,具体实现是 subscribeActual :
看下小强(ObserveOnObserver)的 onSubscribe:
这里创建了一个队列,大小为128。到目前为止所有操作都发生在主线程中。
回到上面,继续执行
上篇文章已经介绍过了,具体流程如下
在上篇介绍到这的时候说,接下来的操作都是在子线程中进行的,那此时这里会有什么转折呢?
后续还有:服务员端菜(CreateEmitter.onNext) —> 顾客小红拿到菜(SubscribeOnObserver.onNext) ,到这里都是执行在子线程中(卧槽,怎么还没切线程啊,这都快走完了),接着小强拿到菜(ObserveOnObserver.onNext),看下代码
最后执行 HandlerScheduler 的 schedule
原来用的是 Handler 机制来完成的,那 Runnable 具体执行的是什么呢?看下小强的 run 方法
就是从队列中取出传过来的数据,交给小明的 onNext 方法执行,所以小明的 onNext 是在主线程中执行,这部分流程如下(Queue 即小强内部维护的队列):
上图的事件调度不保证顺序,只是模拟了其中一种情况。
3
多次observeOn
上面我先把下游接收事件指定在主线程,再指定在一个新的线程,打印如下:
看到此时下游接收事件被成功执行在后指定的新线程,这里是怎么实现的呢?分解下代码
执行到这的时候应该是这样的
这也是上面分析过的, subscribeOn 返回的 Observable 称为黄焖鸡店(ObservableSubscribeOn),observeOn 返回的 Observable 称为肯德基1号店(ObservableObserveOn),肯德基1号店持有黄焖鸡店的引用;接着
执行到这的时候应该是这样的
把第二次 observeOn 返回的 Observable 称为肯德基2号店(ObservableObserveOn),肯德基2号店持有1号店的引用;接着
会先执行肯德基2号店的 subscribeActual 方法,这里的 observer 是小明
这里会创建小强(ObserveOnObserver),为了和后面区分开,就叫他2号店小强,2号店小强持有小明的引用,之后执行肯德基1号店的 subscribeActual ,observer 是肯德基2号店小强
这里又创建了小强,就叫他肯德基1号店小强,1号店小强持有2号店小强的引用,整个过程如下
接着执行黄焖鸡店的 subscribeActual,observer 是肯德基1号店小强
执行到上面 s.onSubscribe(parent) 是应该是这样的
因此 onSubscribe 方法还是执行在主线程中;其实看到这就有点明白了,就是一层层的回调…接着执行后面的流程,直接上图
上图省略了其他事件,并且省略了事件入队的过程,至此分析完毕。