Rxjava2(三)合并操作符
1.concat
private void concat() { final Integer[] items={1,2,3,4}; Observable.concat(Observable.just(1,2,3),Observable.just(4,5)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======accept==="+integer); } }); }
作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行执行 注意:concat()
组合被观察者数量≤4个
2.concatArray
private void concatArray() { final Integer[] items={1,2,3,4}; Observable.concatArray(Observable.just(1,2,3), Observable.just(4,5), Observable.just(6,7), Observable.just(8,9), Observable.just(10,11), Observable.just(12,13)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======accept==="+integer); } }); }
- 作用
组合多个被观察者一起发送数据,合并后 按发送顺序串行执行 注意:concatArray()
则可>4个
3. merge
private void merge() { Observable.merge(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("====aLong===="+aLong); } }); }
- 作用
组合多个被观察者一起发送数据,合并后 按时间线并行执行
组合被观察者的数量,即merge()
组合被观察者数量≤4个
4.mergeArray
private void mergeArray() { Observable.mergeArray(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(7,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(10,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(13,3,2,1, TimeUnit.SECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("====aLong===="+aLong); } }); }
- 二者区别:组合被观察者的数量,即
merge()
组合被观察者数量≤4个,而mergeArray()
则可>4个 - 区别上述
concat()
操作符:同样是组合多个被观察者一起发送数据,但concat()
操作符合并后是按发送顺序串行执行
5.concatDelayError
测试结果:第1个被观察者发送Error事件后,第2个被观察者则不会继续发送事件
那么如果希望onError事件推迟到其他观察者发送事件结束
private void concatArrayDelayError() { Observable.concatArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onError(new NullPointerException()); e.onComplete(); } }),Observable.just(4,5,6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====accept=====" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { LogUtils.syso("=====accept=throwable====" + throwable.getMessage()); } }); }
达到预期的效果
小结:
6.zip (合并多个事件)
该类型的操作符主要是对多个被观察者中的事件进行合并处理
作用:
合并 多个被观察者(Observable
)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送
假设这样一种场景,我们利用github api开发一个app,在user界面,我既要请求user基本信息,又要列举user下的event数据,为此,我准备使用Retrofit来做网络请求。
虽然在后台有两次请求,但是在前台,我们希望用户打开这个页面,然后等待加载,然后显示。用户只有一次等待加载的过程。所以说,我们需要等待这两个请求都返回结果了,再开始显示数据。
怎么办?自己写判断两个都加载已完成的代码吗?逻辑好像也不是很复杂,但是代码看起来就没有那么高大上了啊。
其实既然你都用过了还有,那么直觉上你应该意识到也许RxJava可以解决这个问题。没错,就是RxJava,使用zip操作符。
zip( ):使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
private void zip() { Observable.zip(Observable.just(1, 2, 3), Observable.just("one", "two", "three"), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer+s; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LogUtils.syso("======返回的结果======="+s); } }); }
7.conbineLatest
- 作用
当两个Observables
中的任何一个发送了数据后,将先发送了数据的Observables
的最新(最后)一个数据 与 另外一个Observable
发送的每个数据结合,最终基于该函数的结果发送数据 -
与
Zip()
的区别:Zip()
= 按个数合并,即1对1合并;CombineLatest()
= 按时间合并,即在同一个时间点上合并
private void combineLatest() { Observable.combineLatest(Observable.just(1, 2, 3), Observable.just("one", "two", "three"), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer+s; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LogUtils.syso("======返回的结果======="+s); } }); } }
8.reduce
- 作用
把被观察者需要发送的事件聚合成1个事件 & 发送
private void reduce() { Observable.just(1, 2, 3) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.e("REDUCE", "本次计算的数据是: "+integer +" 加"+ integer2); return integer+integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result===="+integer); } }); }
9.collect
作用
将被观察者Observable
发送的数据事件收集到一个数据结构里
private void collect() { Observable.just(1,2,3,4) .collect( // 1. 创建数据结构(容器),用于收集被观察者发送的数据 new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { return new ArrayList<>(); } },// 2. 对发送的数据进行收集 new BiConsumer<List<Integer>, Integer>() { /** * mList 容器, * integer 后者数据 * * */ @Override public void accept(List<Integer> mList, Integer integer) throws Exception { mList.add(integer); } }).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { LogUtils.syso("====result======"+integers); } }); }
10.startWith/startWithArray
作用
在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
Observable.just(1,2,3,4) .startWith(5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=======result======="+integer); } });
Observable.just(1,2,3,4) .startWithArray(7,8,9) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======resutl======="+integer); } });
11.count
作用
统计被观察者发送事件的数量
private void count() { Observable.just(1,2,3,4) .count() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("========发送事件的个数======"+aLong); } }); }