RxJava 利用zip()实现两个请求合为一个请求
本人是做设备相关的应用开发,在项目中遇到了一种奇葩的设计:
APP发送某功能指令给设备,设备收到后返回一个收到指令的响应,表示设备已经收到指令了,等设备完成动作后,会再次返回一个响应,表示操作的结果。即一次请求,两次响应。
为了更加明白的阐述问题,用一个图来进行说明:
只要APP和设备的连接没有断开,一般情况下第一个响应很快就会收到,第二个响应相对久一点返回。此外,第一个响应不是使用EventBus post() 过来的,而第二个响应是EventBus post() 过来的。有的朋友说也可以在@Subscribe方法中更新界面,当然这样也可以。但是在此种情况下,EventBus虽然能解耦,也会使业务代码分散,难以阅读和维护。业务逻辑使用RxJava实现,用过RxJava的人应该都有体会,调用链断开是比较难受的。
为了解决这个问题,即使业务流不那么分散,我想到了zip() 操作符,我把整个请求流程看作两个请求,然后合成一个请求来写业务流。不啰嗦,上代码。
public class MainActivity extends AppCompatActivity {
Observable<Integer> observable1;
PublishSubject<Integer> observable2;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getDefault().register(this);
// 模拟第一个请求,APP发送给设备的请求指令
observable1 = Observable.just(1);
// 模拟第二个请求
observable2 = PublishSubject.create();
// 将两个请求合为一个请求
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
// 两个结果都返回后,才算整个流程结束
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 整个流程的结果
Log.i("Test", "result = " + integer);
}
});
// 延时,表示设备正在处理,然后发送结果给APP
Log.i("Test", "observable2 getting");
Observable.timer(10, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
Log.i("Test", "observable2 got");
return new String("sunlg");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
Log.i("Test", "observable2 completed");
EventBus.getDefault().post(3);
}
});
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onPost(Integer integer){
observable2.onNext(integer);
observable2.onComplete();
}
@Override
protected void onDestroy() {
EventBus.getDefault().unregister(this);
super.onDestroy();
}
}
在代码中,我为什么要使用PublishSubject?Suject既是一个Observable,也是一个Observer,当然我使用它的原因是创建Subject时,只需要调用静态方法create(),而可以先不用写具体的数据发射逻辑。想了解Suject的相关用法可以去下面链接看看: