RxJava 2.x 之图解创建、订阅、发射流程

从一个例子开始

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 3; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
                Log.d(TAG, "subscribe " + Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.newThread())
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer value) throws Exception {
                        Log.d(TAG, "apply " + Thread.currentThread().getName());
                        return "apply " + value;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new ResourceObserver<String>() {
                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "onNext " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete " + Thread.currentThread().getName());
                    }
                });

来看看输出:

10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: create RxNewThreadScheduler-1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 0
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 2
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onComplete main

可以看到创建发送转换过程都在子线程中,而最后的回调是在主线程中

整个过程笔者整理成一张图,一步一步来跟进分析

RxJava 2.x 之图解创建、订阅、发射流程

创建过程

  • 第一步:通过create操作符创建了一个ObservableCreate类型的Observable,由于是基于匿名内部类创建的,因此持有的是实现了ObservableOnSubscribe接口的HomeActivity实例

RxJava 2.x 之图解创建、订阅、发射流程

  • 第二步:通过subscribeOn操作符创建了一个ObservableSubscribeOn类型的Observable,且其内部的source持有上个步骤的ObservableCreate实例

RxJava 2.x 之图解创建、订阅、发射流程

  • 第三步:通过map操作符创建了一个ObservableMap类型的Observable,且其内部持有上个步骤传入的ObservableSubscribeOn实例

RxJava 2.x 之图解创建、订阅、发射流程

  • 第四步:通过observeOn操作符创建了一个ObservableObserveOn类型的Observable,且其内部持有上个步骤的ObservableMap实例

RxJava 2.x 之图解创建、订阅、发射流程

  • 第五步:通过subscribeWith方法完成订阅,由于是基于匿名内部类创建的,因此传入的实际上是实现了ResourceObserverHomeActivity实例

RxJava 2.x 之图解创建、订阅、发射流程

订阅过程

上述的几个步骤其实已经完成的基本的创建过程了,最后我们拿到的实际是ObservableObserveOn的实例,下面开始订阅流程。

  • 第一步:subscribeWith方法,传入的observer是实现了ResourceObserver接口HomeActivity实例,通过subscribeActual发起订阅,内部实际调用的是source.subscribe方法,由于source持有的是上面传入的ObservableMap实例,因此这一步骤实际调用的是,ObservableMap实例中的subscribe方法,传入的参数就是ObserveOnObserver实例(构造参数主要是实现了ResourceObserver的实例即:HomeActivity)

RxJava 2.x 之图解创建、订阅、发射流程

  • 第二步:进入ObservableMap实例subscribe方法中,通过subscribeActual发起订阅,实际调用的是source.subscribe方法,传入的是MapObserver实例(构造参数为之前传递的ObserveOnObserver实例),由于source持有的是ObservableSubscribeOn的实例,因此最终调用的其实是ObservableSubscribeOn实例中的subscribe方法

RxJava 2.x 之图解创建、订阅、发射流程

  • 第三步:进入ObservableSubscribeOn实例subscribe方法中,通过subscribeActual发起订阅,完成MapObserver实例对SubscribeOnObserver的订阅,接着由NewThreadScheduler线程调度器完成对应的任务(该任务的执行是在线程中执行的),SubscribeTask实现了Runnable接口,最终会回调run方法,执行source.subscribe方法,这里的source持有的就是最开始的ObservableCreate实例

RxJava 2.x 之图解创建、订阅、发射流程

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //这里的s就是上个步骤的MapObserver实例
        s.onSubscribe(parent);
        //这里的scheduler就是我们最开始指定的Schedulers.newThread 即NewThreadScheduler线程调度器
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

RxJava 2.x 之图解创建、订阅、发射流程

  • 第四步:进入ObservableCreate实例subscribe方法中,通过subscribeActual发起订阅,这里的source持有的是HomeActivity实例,直接调用subscribe方法,传入参数是构建的最顶层的发射器CreateEmitter实例

RxJava 2.x 之图解创建、订阅、发射流程

  • 第五步:上述的几个过程实际已经完成了订阅的过程,最后经过层层传递,持有的最顶层的是CreateEmitter实例,即我们最终的被观察者

发射过程

上述的过程已经完成了订阅过程,在最后订阅完成之后,最终会通过source.subscribe方法,其实就是调用HomeActivity实例的subscribe方法,完成元素发射

@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    for (int i = 0; i < 3; i++) {
        emitter.onNext(i);
    }
    emitter.onComplete();
    Log.d(TAG, "subscribe " + Thread.currentThread().getName());
}

我们在最顶层的被观察者里通过ObservableEmitter实例onNext方法完成元素的发射,最终又会通过一层一层的Observer转发到最原始的实现了ResourceObserver接口观察者中来

注意:

  • 这里的被观察者里的所有发射过程实际上都是在NewThreadScheduler线程调度器分配的线程里完成的
  • 发射的元素被传递到下层的ObservableObserveOn类中的ObserveOnObserver实例onNext方法,实际执行的是HandlerScheduler.HandlerWorkerschedule方法,最终就是通过我们持有的主线程的handler切换到主线程中

RxJava 2.x 之图解创建、订阅、发射流程

小结

整个创建过程订阅过程发射过程看起来山路十八弯,但是如果你一步一步跟进查看,会发现整个流程实际上是很清晰的,整个过程起点终点很明确,
而中间产生的一系列ObservableObserver你都可以看作是代理类,用来转发订阅以及最终的元素发射