RxJava 利用zip()实现两个请求合为一个请求

本人是做设备相关的应用开发,在项目中遇到了一种奇葩的设计:

APP发送某功能指令给设备,设备收到后返回一个收到指令的响应,表示设备已经收到指令了,等设备完成动作后,会再次返回一个响应,表示操作的结果。即一次请求,两次响应。

为了更加明白的阐述问题,用一个图来进行说明:
RxJava 利用zip()实现两个请求合为一个请求

只要APP和设备的连接没有断开,一般情况下第一个响应很快就会收到,第二个响应相对久一点返回。此外,第一个响应不是使用EventBus post() 过来的,而第二个响应是EventBus post() 过来的。有的朋友说也可以在@Subscribe方法中更新界面,当然这样也可以。但是在此种情况下,EventBus虽然能解耦,也会使业务代码分散,难以阅读和维护。业务逻辑使用RxJava实现,用过RxJava的人应该都有体会,调用链断开是比较难受的。

为了解决这个问题,即使业务流不那么分散,我想到了zip() 操作符,我把整个请求流程看作两个请求,然后合成一个请求来写业务流。不啰嗦,上代码。

public class MainActivity extends AppCompatActivity {

    Observable<Integer> observable1;
    PublishSubject<Integer> observable2;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        EventBus.getDefault().register(this);
        // 模拟第一个请求,APP发送给设备的请求指令
        observable1 = Observable.just(1);
        // 模拟第二个请求
        observable2 = PublishSubject.create();
        // 将两个请求合为一个请求
        Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
				// 两个结果都返回后,才算整个流程结束
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                // 整个流程的结果
				Log.i("Test", "result = " + integer);
            }
        });

        // 延时,表示设备正在处理,然后发送结果给APP
        Log.i("Test", "observable2 getting");
        Observable.timer(10, TimeUnit.SECONDS)
                .map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        Log.i("Test", "observable2 got");
                        return new String("sunlg");
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String o) throws Exception {
                        Log.i("Test", "observable2 completed");
                        EventBus.getDefault().post(3);
                    }
                });
    }

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onPost(Integer integer){
        observable2.onNext(integer);
        observable2.onComplete();
    }

    @Override
    protected void onDestroy() {
        EventBus.getDefault().unregister(this);
        super.onDestroy();
    }
}

在代码中,我为什么要使用PublishSubjectSuject既是一个Observable,也是一个Observer,当然我使用它的原因是创建Subject时,只需要调用静态方法create(),而可以先不用写具体的数据发射逻辑。想了解Suject的相关用法可以去下面链接看看:

  1. Rxjava2.0中的 Subject
  2. Understanding RxJava Subject — Publish, Replay, Behavior and Async Subject