图解RxJava2(三)

图解RxJava2(三)

点击上方蓝字关注公众号

码个蛋第250次推文

图解RxJava2(三)

今天有肯德基

作者:HuYounger

博客:http://rkhcy.github.io/

文章目录

  • 概述

  • 例子

  • 源码分析

  • 多次observeOn

0

概述

上篇文章只分析了 RxJava 中 subscribeOn 方法的实现原理,然而只使用 subscribeOn 发现上下游都是执行在子线程中。在日常开发中往往是将上游耗时任务通过 subscribeOn 指定在子线程中,下游通常是更新 UI 等需要在主线程中进行,使用 observeOn(AndroidSchedulers.mainThread()) 就能实现,那么它是怎么做到的呢?

1

例子

基于上篇文章的代码,修改上下游联系,添加 observeOn(AndroidSchedulers.mainThread())

图解RxJava2(三)

打印如下

图解RxJava2(三)

此时主线程中的「其他任务」没有被阻塞,上游的耗时任务在子线程 RxNewThreadScheduler-1 中执行,而下游接收任务在主线程中进行,并且事件传递不保证顺序(多次执行输出可能都不同),这也是多线程执行顺序的不确定性特点,上篇已介绍过。

2

源码分析

图解RxJava2(三)

有前两篇分析经历,现在就轻车熟路:执行 Observable.create 、 new Observer 、Schedulers.newThread()、

subscribeOn(Scheduler) 后此时主线程应该是下面的样子

图解RxJava2(三)

AndroidSchedulers.mainThread()

AndroidSchedulers 是 RxAndroid 中提供的,使用前需要在 Android Studio 中添加依赖。mainThread() 最后会创建 HandlerScheduler:

图解RxJava2(三)

图解RxJava2(三)

HandlerScheduler 也是 Scheduler 的子类,在初始化 HandlerScheduler 的时候创建了一个持有主线程 Looper 的 Handler ,可以猜想后面线程切换很有可能就是 Handler 机制的那一套。此时的主线程

图解RxJava2(三)

observeOn(Scheduler scheduler)

图解RxJava2(三)

该方法返回 Observable ,创建了 ObservableObserveOn(已经习惯了,就这几个英文单词排列组合),它也是 Observable 的子类,结合我们举的例子,就给它起名肯德基,肯德基持有黄焖鸡饭店的引用,初始化如下:

图解RxJava2(三)

此时的主线程

图解RxJava2(三)

subscribe(Observer observer)

由上两篇分析可知,这里会先去执行 ObservableObserveOn(肯德基) 的 subscribeActual(observer) 方法,这里的 observer 是顾客小明

图解RxJava2(三)

步骤① 和上篇一样,这里也会创建 Worker,具体实现在 HandlerScheduler 中

图解RxJava2(三)

并把之前持有主线程 Looper 的 Handler 传进去。

图解RxJava2(三)

步骤② 先创建了 ObserveOnObserver(总起这种很操蛋的名字),作为

ObservableObserveOn(肯德基)的内部类,它是 BasicIntQueueDisposable 的子类(保证原子性、拥有操作队列功能、保证一次性操作),实现Observer接口(也是个顾客)。结合例子,就给它起名叫顾客小强,只是这个小强功能比较强大,小强持有小明的引用。

图解RxJava2(三)

接着执行 ObservableSubscribeOn(黄焖鸡饭店)的 subscribe ,具体实现是 subscribeActual :

图解RxJava2(三)

看下小强(ObserveOnObserver)的 onSubscribe:

图解RxJava2(三)

这里创建了一个队列,大小为128。到目前为止所有操作都发生在主线程中。

图解RxJava2(三)

回到上面,继续执行

图解RxJava2(三)

上篇文章已经介绍过了,具体流程如下

图解RxJava2(三)

在上篇介绍到这的时候说,接下来的操作都是在子线程中进行的,那此时这里会有什么转折呢?

图解RxJava2(三)

后续还有:服务员端菜(CreateEmitter.onNext) —> 顾客小红拿到菜(SubscribeOnObserver.onNext) ,到这里都是执行在子线程中(卧槽,怎么还没切线程啊,这都快走完了),接着小强拿到菜(ObserveOnObserver.onNext),看下代码

图解RxJava2(三)

最后执行 HandlerScheduler 的 schedule

图解RxJava2(三)

原来用的是 Handler 机制来完成的,那 Runnable 具体执行的是什么呢?看下小强的 run 方法

图解RxJava2(三)

图解RxJava2(三)

就是从队列中取出传过来的数据,交给小明的 onNext 方法执行,所以小明的 onNext 是在主线程中执行,这部分流程如下(Queue 即小强内部维护的队列):

图解RxJava2(三)

上图的事件调度不保证顺序,只是模拟了其中一种情况。

3

多次observeOn

图解RxJava2(三)

上面我先把下游接收事件指定在主线程,再指定在一个新的线程,打印如下:

图解RxJava2(三)

看到此时下游接收事件被成功执行在后指定的新线程,这里是怎么实现的呢?分解下代码

图解RxJava2(三)

执行到这的时候应该是这样的

图解RxJava2(三)

这也是上面分析过的, subscribeOn 返回的 Observable 称为黄焖鸡店(ObservableSubscribeOn),observeOn 返回的 Observable 称为肯德基1号店(ObservableObserveOn),肯德基1号店持有黄焖鸡店的引用;接着

图解RxJava2(三)

执行到这的时候应该是这样的

图解RxJava2(三)

把第二次 observeOn 返回的 Observable 称为肯德基2号店(ObservableObserveOn),肯德基2号店持有1号店的引用;接着

图解RxJava2(三)

会先执行肯德基2号店的 subscribeActual 方法,这里的 observer 是小明

图解RxJava2(三)

这里会创建小强(ObserveOnObserver),为了和后面区分开,就叫他2号店小强,2号店小强持有小明的引用,之后执行肯德基1号店的 subscribeActual ,observer 是肯德基2号店小强

图解RxJava2(三)

这里又创建了小强,就叫他肯德基1号店小强,1号店小强持有2号店小强的引用,整个过程如下

图解RxJava2(三)

接着执行黄焖鸡店的 subscribeActual,observer 是肯德基1号店小强

图解RxJava2(三)

执行到上面 s.onSubscribe(parent) 是应该是这样的

图解RxJava2(三)

因此 onSubscribe 方法还是执行在主线程中;其实看到这就有点明白了,就是一层层的回调…接着执行后面的流程,直接上图

图解RxJava2(三)

上图省略了其他事件,并且省略了事件入队的过程,至此分析完毕。

图解RxJava2(三)