Rxjava2(四)功能操作符
1.delay
作用
使得被观察者延迟一段时间再发送事件
private void delay() { Observable.just(1,2,3,4) .delay(5,TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("====延时发送reslut===="+integer); } }); }
在事件的生命周期中操作
- 需求场景
在事件发送 & 接收的整个生命周期过程中进行操作
如发送事件前的初始化、发送事件后的回调请求等
2.doOnEach
当observable每发送一次数据事件就会调用一次,其中包含onNext onError onComplete
Observable.just(1,2,3,4) .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { LogUtils.syso("==================="); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result====="+integer); } }); }
3.doOnNext
执行onNext()之前调用
Observable.just(1,2,3,4) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======onNext前调用======"); } }). subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result====="+integer); } });
4.doAfterNext后调用
Observable.just(1,2,3,4) .doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======onNext后调用======"); } }). subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result====="+integer); } }); }
5.doOnComplete
Observable正常发送事件完毕后调用
Observable.just(1,2,3,4) .doOnComplete(new Action() { @Override public void run() throws Exception { LogUtils.syso("======Observable正常发送事件完毕后调用=========="); } }). subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result====="+integer); } });
6.doOnError
发送错误事件时候调用
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new NullPointerException()); e.onComplete(); } }) .doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { LogUtils.syso("===========发送错误事件的时候调用===="+throwable.getMessage()); } }). subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result====="+integer); } });
其他do操作符
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } }) // 6. 观察者订阅时调用 .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { LogUtils.syso("=====doOnSubscribe====="); } }) // 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止 .doAfterTerminate(new Action() { @Override public void run() throws Exception { LogUtils.syso("=====doAfterTerminate====="); } }) // 8. 最后执行 .doFinally(new Action() { @Override public void run() throws Exception { LogUtils.syso("=====doFinally===最后执行=="); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result====="+integer); } });