攀岩观火之RxJava

前言

对于学习RxJava的一些笔记,加上一些自己的理解,以及一些目前仍存在的疑问

主要参考下面的四篇文章:
1.扔物线 /给Android开发者的RxJava详解 https://gank.io/post/560e15be2dca930e00da1083#toc_1
2.CSDN大头鬼译文/深入浅出RxJava系列:https://blog.csdn.net/lzyzsd/article/details/41833541
3.CSDN下一个五年/理解RxJava的lift :https://blog.csdn.net/a910626/article/details/79316121
4.简书野生安卓兽/lift原理解析:https://www.jianshu.com/p/b15d9b3e194e

定义

RxJava在Github的介绍:

RxJava : a library for composing asynchronous and event-based programs using observable sequences for the Java VM
RxJava是一个基于事件流、实现异步操作的库

作用

实现异步操作,类似于 Android 中的 AsyncTask、Handler的作用

特点

简洁优雅,程序越复杂,这点体现的越明显。它会把一系列复杂的或者嵌套的调用转换成一个链式调用

原理

扩展的观察者模式

RxJava的异步实现,是通过一种扩展的观察者模式来实现的,那么,我们先来了解一下经典的观察者模式

观察者模式

观察者模式面向的需求是:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间作出反应。
攀岩观火之RxJava
把这张图片中的概念抽象出来(Button->被观察者 、 OnClickListener->观察者 、 setOnClickListener()->订阅 、 onClick()->事件 ),这样就由专用的观察者模式(只用于监听控件点击)转变成了通用的观察者模式
攀岩观火之RxJava

而RxJava作为一个工具库,使用的就是通用形式的观察者模式(TODO1:不是说扩展的观察者模式么)

RxJava的观察者模式

四个基本概念

  • Observable
  • Observer
  • subscribe
  • event

Observable和Observer通过subscribe()实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer

与传统的观察者模式不同,RxJava的事件回调方法除了普通事件 onNext()(相当于 onClick()/onEvent())之外,还定义了两个特殊的事件: onCompleted()和onError()

基本实现

基于以上的概念,RxJava的基本实现主要有三点:

创建Observer

Observer即观察者,它决定了事件触发的时候将有怎样的行为(而Observable则决定了什么时候调用以及调用顺序),RxJava中的Observer接口的实现方式:

Observer<String> observer = new Observer<String>(){
	@Override
	public void onNext(String s){
		Log.d(tag,"Item: "+s);
	}

	@Override
	public void onCompleted(){
		Log.d(tag,"Completed!");
	}

	@Overridepublic void onError(Throwable e){
		Log.d(tag,"Error!");
	}
}

除了 Observer 接口之外,RxJava还内置了一个实现了Observer的抽象类:Subscribe。Subcriber对Observer接口进行了一些扩展,但它们的基本使用方式是完全一样的,代码我就不贴了,也是实现这三个方法。

Subscriber<String> subscriber = new Subscriber<String>(){
	````onNext/onCompleted/onError
}

不仅在使用方式上一样,实质上,在RxJava的subscribe过程中,Observer也总是会被转换成一个Subscriber再使用。所以,如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。
它们的区别主要在于两点:

  • onStart():这是Subscriber增加的方法;它的调用时机是,在subscribe刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,如数据的清零或重置。这是一个可选方法,它的默认实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),onStart就不适用了,因为它总在subscribe所发生的线程(TODO2 注册或者订阅动作发生的线程,不一定是主线程)被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubcribe()方法,具体可以在后面的文章中看到。
  • unsubscribe():这是 Subscriber所实现的另一个接口Subscription的方法
创建 Observable

Observable即被观察者,它决定什么时候触发事件以及触发怎样的事件(onNext还是onComplete/onError)。
RxJava使用create()方法来创建一个 Observable。并为他定义事件触发规则:

Observable observable = Observable.create(new Observable.OnSubscribe<String>{
	@Override
	public void call(Subscriber<? super String> subscriber){
        // 这里是立即发送数据,因为 这几个字符串是我们呢现在就有的,也可以先请求数据,当数据请求到了,再发送数据
        // 例如加上这么一句
        // str = getStr(); //getStr是一个耗时操作,所以代码需要等待 str获取到了才发送,getStr 可以是网络请求也可以是IO请求
		subscriber.onNext("Hello");
		subscriber.onNext("Hi");
		subscriber.onNext("Aloha");
		subscriber.onCompleted();
	}
});
// TODO3 对于这段代码我有一点疑问,call方法应该是在事件被触发的时候被Observabler调用,而学习资料上都说,当观察者Observable被订阅的时候,OnSubcribe的call会自动被调用

下面解释了为什么会立即发送这些事件,而不需要触发条件

这个例子很简单:事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的);所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。总之,这个例子看起来毫无实用价值。但这是为了便于说明,实质上只要你想,各种各样的事件发送规则你都可以自己来写。至于具体怎么做,后面都会讲到,但现在不行。只有把基础原理先说明白了,上层的运用才能更容易说清楚。

事件队列(即发送事件的规则)。create()方法是 RxJava最基本的创造事件序列的方法,基于这个方法,RxJava还提供了一些方法用来创建事件序列,例如:

1.just(T…) // 直接传入对象

Observable observable = Observable.just("Hello","Hi","Aloha")
equals:
onNext("Hello")->onNext("hi")->onNext("Aloha")->onCompleted()

2.from(T[]) / from(Iterable<?extends T>) 将传入的 Iterable ,传入数组或者链表拆分成具体对象后,依次发送出来,再拆分成一个个的对象

String[] words = {"Hello","Hi","Aloha"}
Observable observable = Observable.from(words)
同上一样的事件序列。这些事件序列也就是调用规则,由被观察者Observable决定
Subscribe(.vt 订阅)

创建了Observable和Observer之后,再用subscribe()方法将他们关联起来(让Observable持有Observer的引用)

observable.subscribe(observer) **or**
observable.subscribe(subscriber)

有人可能注意到,subscribe() 这个方法有点怪,它看起来像是顺序搞反了,成了[observable]订阅了[observer],读起来像是杂志订阅了读者,让人别扭。这种设计单纯是为了API的流式设计,牺牲了这点

我们来看一下 Observable.subscribe(Subscriber)的内部实现(仅核心代码)

public Subscription subscribe(Subscriber subscriber){
	subscriber.onStart(); //先执行观察者的一些初始化方法
	OnSubscribe.call(subscriber); //一个接口,内部定义了当subcribe动作执行的时候的一系列动作,也就是发送事件
	return subscriber;
}

可以看到,subscribe()做了三件事:
1.调用 SUbscriber.onStart() // 一个可选的准备方法
2.调用Observable中的OnSubscribe.call(Subscriber)。在这里,事件的发送逻辑开始运行(onNext(事件))
3.将传入的SUbscriber作为Subscription返回,这是为了方便unsubscribe()
如图:

除了 subscribe(Observer)和subscribe(Subscriber),subscribe()还支持不完整定义的回调,RxJava会自动根据定义创建出Subscriber,形式如下:

Action1<String> onNextAction = new Action1<String>(){
	@Override
	public void call(String s){
		Log.d(tag,s);
	}
}

Action1<String> onErrorAction = new Action1<Throwable>(){
    @Override
    public void call(Throwable throwable){
        // Error handling
    }
}

Action0 onCompletedAction = new Action0(){
    @Override
    public void call(){
        //.....
    }
}

可以看到 Action后面的数字代表所需要的参数个数,error和next都是1,而completed是0

小结:

要弄清楚什么是事件,什么是发送事件,事件就是上面屡屡用到的字符串,也就是 onNext方法中的参数,将事件传入onNext方法实际上就是发送事件,注意,是传入的这个动作就是发送事件,而onNext则是对事件的处理,不能把笼统地说onNext称之为发送事件,这是我个人的理解;这就好比说,onNext是发送事件,那什么是执行事件呢?所以懂了吧哈哈.

举个例子:

1.打印字符串数组

String[] names = {"Hello","yudan and ","zhuzi"};
 Observable.from(names)
 	.subscribe(new Observer<String>{
 		....
 		public void onNext(String s){
 			log.d(TAg,"onNext: "+s);
 		}
 		....
 	});

Output:
onNext: Hello
onNext: yudan and
onNext: zhuzi
onCompleted

注意事件与事件流

2.由id取得图片并显示
由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错

iv = findViewById(R.id.img_view);
        Observable.create(new ObservableOnSubscribe<Drawable>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Drawable> emitter) throws Exception{
                Drawable drawable = getResources().getDrawable(R.mipmap.ic_launcher);
                Drawable drawable1 = getResources().getDrawable(R.mipmap.source_leifeng);
                emitter.onNext(drawable);
                emitter.onNext(drawable1);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Drawable>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Drawable drawable) {
                Log.d(TAG, "onNext: ");
                iv.setImageDrawable(drawable);
            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(SetImgActivity.this, "onError: ",Toast.LENGTH_SHORT)
                        .show();
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
        //上面是基于RxJava2.0

在RxJava的默认规则里,事件的发出和消费都是在同一线程的,也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式,观察者模式本身的目的就是后台处理,前台回调,因此异步对于RxJava是至关重要的,而要实现异步,则需要用到 RxJava 的另一个概念:Scheduler

线程控制–Scheduler/01 调度器

1.Schedule的API
在RxJava中,通过Schedule来制定每一段代码应该运行在什么样的线程

  • Schedulers.immediate():直接运行在当前线程,相当于不指定线程,这是默认的Scheduler
  • Schedulers.newThread():总是启用新线程,并在新线程执行操作
  • Schedulers.io():I/O操作所使用的Scheduler。行为模式和newThread()差不多,区别在于 io()的内部实现是一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率,不要把计算工作放在io()中,可以避免创建不必要的线程
  • Schedulers.computation():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算,它使用固定的线程池(大小为CPU核数)
  • Android还有一个专用的AndroidSchedulers.mainThread() //需要添加上 rxandroid依赖
    有了这几个Scheduler,就可以使用 subscribeOn()和observerOn()两个方法来对线程进行控制了
Observable.just(1,2,3,4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Integer>{
        @Override
        public void call(Integer number){
            Log.d(TAG,"number:"+number);
        }
    })

Scheduler的远离我们呢会放在下一章来讲,它是以变换的原理为基础的

变换

RxJava提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说"RxJava太好用了"的原因;所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列,来看API
map

Observable.just("imags/logo.png")
    .map(new Func1<String,Bitmap>(){
        @Override
        public Bitmap call(String filePath){
            return getBitmapFromPath(filePath);
        }
    })
    .subscribe(new Action1<Bitmap>(){
        @Override
        public void call(Bitmap bitmap){
            showBitmap(bitmap);
        }
    });

RxJava不仅可以针对事件对象,还可以针对整个事件序列,下面列举一些常见的变化以及分析图

1.map
攀岩观火之RxJava

2.flatMap
攀岩观火之RxJava

这是一个非常有用但是难以理解的变化,flat:扁平化的意思
感觉 flatMap和from有点类似,区别在于from是对数据源处理,而flatmap是对Observable处理

看一段Sample吧

Student[] students = ...
Subscriber<Course> subscriber = new Subcriber<Course>(){
    @Override
    public void onNext(Course course){
        Log.d(tag,course.getName());
    }
    ....
}

Observable.from(students)
    .flatMap(new Func1<Student,Observable<Course>>(){
        @Override
        public Observable<Course> call(Student student){ //传进来的 student已经是单个的student了
            return Observable.from(students.getCourses());
            //这里也可以不对student变换,返回一个 Observable<Student>,就相当于只实现了扁平化,但是你要知道 flatMap = 扁平化+变化(可以返回一样的就相当于没用变换)
        }
    })

扩展:由于可以在嵌套的 Observable中添加异步代码,flatMap也常用于嵌套的异步操作,例如嵌套的网络请求

3.throttleFirst():在每次事件触发后的一定时间间隔内丢弃新的事件。常用作抖动过滤
RxView.clickEvents(button)
.throttleFirst(500,TimeUnit.MILLISECONDS)

此外,RxJava还有很多便捷的方法来实现事件序列的变换,这里就不一一列举了

变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送,而在RxJava内部,它们是基于同一个基础的变换方法:‘lift(Operator)’

首先看一下 ‘lift()’ 的内部实现(仅核心代码)

public <R> Observable<R> lift(Operator<? entends R,? super T> operator){
    return Observable.create(new OnSubscribe<R>(){  //创建一个新的Observable
        @Override
        public void call(Subscriber<R> subscriber){ //这里的subscriber是与上层的Observable对应的
            Subscriber newSubscriber<R,T> = operator.call(Subscriber<R>); // new Subscriber是代理接收者,就是在这里建立起原始Subscriber与新的 Subscriber之间的关系(转换关系)
            //newSubscriber<R,T>.onStart(); 这句先忽略,看关键的两句
            onSubscribe<T>.call(newSubscriber<R,T>); //这个中转的 newSubscriber 订阅上层的Observable(通过调用 onSubscribe.call),直到遇到最*的Observable(事件发射源),这里就是实现了订阅的向上级传递(或者叫做通知上层可以发送消息了)
            // 我们可以看到,这个新构造的Observable<R>,当它被订阅的时候(它本身的onSubscribe<R>执行),它的内部逻辑是,通知原来的Observable<T>(通过调用T层的onSubscribe<T>.call,完成通知),这是向上传递通知的过程
            // 向下发送消息的过程实际上就是不断地调用每一个操作符的中间层 newSubscriber<R,T>的事件函数(onSubscribe.call 的内部就是调用它的事件发送函数),并经过operator的事件转换,传递给下层需要的事件 R;这是事件向下发送的过程
            // 以上的 OnSubscribe.call 方法的逻辑基于这是一个lift操作
            // 理解什么是订阅,订阅就是将一个subscriber对象传递到上层的 OnSubscribe接口的call方法中
            // 上面的构造是在 onSubscribe里面调用onSubscribe,是一个向上传递的过程,也能对应到下图所示,但是原来的 subscriber不具备向上订阅的能力,类型不同,所以需要一个转换,这个转换既完成了向上订阅的可能,又完成了向下传递事件的可能,核心就是对这个subscriber的转换,将Subscriber<R>(接受R事件)转换成 Subscriber<T,R>(接收T事件,并转换成R事件)
        }
    })
}

我们来看一个具体的 Operator 的例子

Integer-->String
observable.lift(new Observable.Operator<String, Integer>() { //这个Operator就是返回 NewSubscriber,它具备订阅 Integer 事件源的能力
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);  //integer转换成string发送给Subscriber<R(String)>
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

流程如***意看 OnSubscribe 和 SUbscriber 的颜色是不一样的,代表它们不是一个层级的)

1.单层lift
攀岩观火之RxJava

2.多层lift
攀岩观火之RxJava

lift流程梳理
1.从流的角度来理解。
2.Observable转化为Observable。
Operator生成了Subscriber<T,R>。
Observable向Subscriber<T,R>发送事件,发送的数据类型是T,通过转换器转化为R,然后发送给Subscriber。
3.流的路线是:
Observable -> Subscriber<T,R> -> Observable -> Subscriber.
4.即,在observable执行了lift操作后,会返回一个新的observable,这个新的observable像代理一样,负责接收原始的observable发出的数据,处理后发送给Subscriber。

中间层返回的虽然是 Observable,但它也是一个 Subscriber(就是因为它内部构造了一个newSubscriber,可以向上订阅,否则无法订阅(即便它持有下层subscriber的引用),因为下层事件和上层不同)