RxJava 2.0从零学起
RxJava 火了挺久,一直没怎么去用,今天抱着学习的心态,一块来研究下吧,看看能不能解决我们实际开发中的问题,不要为了用而用。
在听说RxJava的时候,肯定还听说RxAndroid,这里要先声明一下,RxAndroid没几行代码。主要是加了个Scheduler,AndroidSchedulers.mainThread()来方便在Android上使用RxJava。所以使用的时候,还是需要两个都一起依赖的。
RxAndroid 的地址:https://github.com/ReactiveX/RxAndroid
RxJava的地址:https://github.com/ReactiveX/RxJava
1 ) 首先看下最新版本的可以点下Code,里面有个releases
可以看到最新的是 2.1.14-RC1
2 ) 确定最新版本之后,就开始在gradle里面添加依赖:
compile "io.reactivex.rxjava2:rxjava:2.1.14-RC1"
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
3 ) 环境准备好了,就来跑一下代码。先看下官方的小示例:
Observable.just("one", "two", "three", "four", "five") .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(/* an Observer */);
将这段代码写到我的页面当中
运行成功,但是什么效果都没有。先来解释一下这段代码:
这个的原型实际上是用了观察者模式,最简单的观察者模式就是给button通过setOnClickListener设置点击事件。当用户点击button的时候,button通过之前设置的事件监听,从而响应了OnClick方法。 可以理解为是button 去通知OnClick有事件发生,然后OnClick就去干活啦。
回到代码:
Observable 就是那个button
subscrible 就是那个setOnClickListener
所以现在知道上面的代码为什么没有效果吧,因为你没有处理一个类似OnClick的方法,就好比你给按钮设置了点击事件,也设置了监听,然后你的OnClick没写肯定什么事情都不会发生了。
然后上面写了subscrible ()需要传一个Observer对象,就理解为让你去实现OnClick事件的具体操作吧。
所以代码修改如下:
这段代码,可以看成Observable 突突突发射了"one", "two", "three", "four", "five" 这5个字符串,然后发射到了onNext方法里面。onError是发送失败的情况下会回调,onComplete是所有都发射完成之后会回调。
增加调试日志之后再来看:
运行结果:可以看出当Observable 通过subscrible 订阅了Observer之后,就调用了onSubscribe(别问我怎么知道的,因为这个单词TM就是订阅的意思!!!)然后就开始突突突调用onNext方法,把just里面的每一个值都给推过来。然后执行结束了,就调用了onComplete说明完成了。
再补充一下这个onSubscribe方法的参数Disposable,我们可以输出一下
Log.v("zyl","onSubscribe Disposable:"+d.isDisposed());
看到结果,这个值为false,这个单词翻译过来是一次性的意思,isDisposed 为flase,那就不是一次性的意思,其实这个是代表事件订阅,
由于不是一次性,所以后面的onNext才可以不断去接收。
接下来,我们可以尝试在某种情况下改为true,看看效果,代码如下:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("one"); emitter.onNext("two"); emitter.onNext("three"); emitter.onNext("four"); emitter.onComplete(); //结束 } }).subscribe(new Observer<String>() { Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable = d; } @Override public void onNext(String s) { if("three".equals(s)){ disposable.dispose(); } Log.v("zyl", "onNext:" + s); } @Override public void onError(Throwable e) { Log.v("zyl", "onError"); } @Override public void onComplete() { Log.v("zyl", "onComplete"); } });
输出结果:可以看到,当为three之后,我们调用dispose取消订阅,则后续事件不再接收了。
值得注意的是,订阅了事件后没有及时取订,容易造成内存溢出。这个后续再讲
好了,到这里Rx基本算入门了,其实just只是其中一个操作符,只是为了方便我们调用的一种写法,谁也不会没事只突突突发几个字符串吧。下面来看一下标准的写法:
Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> emitter) throws Exception { emitter.onNext("one"); emitter.onNext("two"); emitter.onNext("three"); emitter.onNext("four"); emitter.onComplete(); //结束 emitter.onNext("five"); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.v("zyl","onSubscribe Disposable:"+d.isDisposed()); } @Override public void onNext(Object o) { Log.v("zyl","onNext:"+o); } @Override public void onError(Throwable e) { Log.v("zyl","onError:"); } @Override public void onComplete() { Log.v("zyl","onComplete"); } }); }
这段代码其实很好理解:通过Observable.create 创建了发射器,分别发射了6个事件。然后下面就会依次接受事件。可以看一下输出:
可以看到,其实只接收了5个事件,"five"没有发送过来,是因为一旦发射了onComplete事件,下游就会停止接收了。
当然了,下游接收的时候直接new Observer()的需要重写几个方法,比较累赘,所以他提供了一种更简单的方法:可以用 Consumer 对象来代替 Observer 对象
代码如下:
Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> emitter) throws Exception { emitter.onNext("one"); emitter.onNext("two"); emitter.onNext("three"); emitter.onNext("four"); emitter.onComplete(); //结束 emitter.onNext("five"); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.v("zyl","accept:"+o); } }); }
Consumer很好理解,就是消费者的意思,其实就是相当于只重写了之前的onNext方法,只是这里改为accept,可以看一下输出结果:
Ok,截止到目前,再来回顾一下:
1.我们先是用了just操作符,看到了很简单的发射数据方式
2.我们认识了通过create的标准写法方式
3.通过Consumer去替代Observer,这种写法虽然更简洁,但就没有Observer那么强大。
接下来,为了体会到他更强大的地方,我们必须多认识几个操作符,因为光知道一个just还不够强大,多认识几个之后,以后实战才不会伸展不出手脚。
接下来,继续学习除just之外的操作符:map
先来看看源码对map的解释:
/** * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and * emits the results of these function applications. * <p> * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt=""> * <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd> * </dl> * * @param <R> the output type * @param mapper * a function to apply to each item emitted by the ObservableSource * @return an Observable that emits the items from the source ObservableSource, transformed by the specified * function * @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a> */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
乍一看好像很复杂,不用慌。我把要点提取出来:
上面的意思就是说调用map的时候,需要传一个Function对象进来,这个Function对象有两个参数类型,一个是T,一个是R。
R:代表的是输出类型,上面好像没有解释T,实际T是输入类型。合起来的意思就是当发射器每次调用onNext方法发射时候,会先经过map指定的这个Function,同时你要把输入类型是什么,输出类型是什么告诉他。下面就来写个简单的例子:
比如我现在onNext方法每次发射一个数字,然后要把在这个数字后面加个人民币符号,然后当成字符串输出,那么可以这样做。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onComplete(); //结束 emitter.onNext(5); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer+"¥"; } }).subscribe(new Consumer<String>() { @Override public void accept(String str) throws Exception { Log.v("zyl","accept:"+str); } }); }
来看下输出结果:
看到这里,你应该联想一下平时项目中的一些应用场景是不是可以通过它来实现?
接下来介绍一个更强大的操作符,Zip,在实际应用场景中,我们往往会从两个地方获取数据,再合并到一个地方输出。比如从A接口获取数据1234,从B接口获取字符串ABCDE,然后在合并到一个地方组合输出,就用到zip。
先来看下方法调用:
@SuppressWarnings("unchecked") @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T1, T2, R> Observable<R> zip( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) { ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2); }
上面的代码总结下来就是zip我需要传三个参数,前面两个都是Observable,最后一个是Bifunction。前面两个就是刚说的一个用来发射12345的,第二个就是用来发射ABC的,然后通过最后一个方法Bifunction将数据组合到一起。这个Bifunction有三个参数,第一个是12345的类型,那这里就是 Integer,第二个是ABC的类型,那就是String,最后一个是组合输出后的类型,那就是String了。当然 ,如果是在现实场景,往往是两个对象数据,组合成一个特殊对象数据,这里为了简单演示,就用数字跟字符串了。
看看代码怎么写,先写两个Observable,跟以前的写法差不多,我这里封装到两个方法里面,方便调用:
private Observable<Integer> getIntegerObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); Log.v("zyl","emitter:1"); emitter.onNext(2); Log.v("zyl","emitter:2"); emitter.onNext(3); Log.v("zyl","emitter:3"); emitter.onNext(4); Log.v("zyl","emitter:4"); emitter.onNext(5); Log.v("zyl","emitter:5"); } }); }
private Observable<String> getStringObservable() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("A"); Log.v("zyl","emitter:A"); emitter.onNext("B"); Log.v("zyl","emitter:B"); emitter.onNext("C"); Log.v("zyl","emitter:C"); } }); }
这两个事件源准备好了,接下里就用zip组合了:
Observable.zip(getIntegerObservable(),getStringObservable(), new BiFunction<Integer,String, String>() { @Override public String apply( Integer integer,String s) throws Exception { return s + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.v("zyl","accept:"+s); } });
代码非常简单。将前面两个方法作为参数传进去就好了。来看看输出结果:
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:1
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:2
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:3
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:4
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:5
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: accept:A1
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:A
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: accept:B2
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:B
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: accept:C3
05-25 10:12:49.172 21434-21434/cm.jkinvest.com.rxjavaproject V/zyl: emitter:C
看到这数据,可能会有点蒙圈,感觉是事件12345一块输出完再输出组合数据,再输出事件ABC,感觉毫无规律,实际上这是因为在同个线程的原因。但是我们在实际应用中,其实只关心我们的zip合并后的数据,这里应该筛选看,比如只看zip接收:
可以看到确实达到我们想要的效果 组合A1、B2、C3
看到这里,可能会想45去哪了,其实这是zip的一个特点,他其实是分别从两个发射数据源的Observable取数据,当发现取ABC之后,已经没有了,那他也就不去另外的Observable取了。所以只有两两配对的组合结果。
再总结看看上面的数据,其实12345全部发射了、ABC也全部发射了、但zip组合只接收了A1、B2、C3。