RxJava不重复执行

RxJava不重复执行

问题描述:

短篇小说结合观测: 我有一个情况我有2个观测量有一个目的:RxJava不重复执行

  • 他们收到了一些数据
  • 他们返回修改后的数据
  • 抛出如果数据无法处理,则会出错

它们各自负责处理不同类型的数据。另外,我想在两个数据都被处理时做一些事情。

我目前最好的实现方式如下,这些都是我的观测量:

Single<BlueData> blueObservable = Single.create(singleSubscriber -> { 
     if (BlueDataProcessor.isDataValid(myBlueData)) { 
      singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData)); 
     } 
     else { 
      singleSubscriber.onError(new BlueDataIsInvalidThrow()); 
     } 
    }); 

    Single<RedData> redObservable = Single.create(singleSubscriber -> { 
     if (RedDataProcessor.isDataValid(myRedData)) { 
      singleSubscriber.onSuccess(RedDataProcessor.process(myRedData)); 
     } 
     else { 
      singleSubscriber.onError(new RedDataIsInvalidThrowable()); 
     } 
    }); 

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable, 
      (blueData, redData) -> PurpleGenerator.combine(blueData, redData)); 

我也有以下订阅:

blueObservable.subscribe(
      result -> { 
       saveBlueProcessStats(result); 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 

    redObservable.subscribe(
      result -> { 
       saveRedProcessStats(result); 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 


    composedSingle.subscribe(
      combinedResult -> { 
       savePurpleProcessStats(combinedResult) 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 

我的问题: 蓝色&红色数据处理两次,因为这两个订阅都再次运行,我订阅了使用Observable.zip()创建的组合observable。

如何在不同时运行两次操作的情况下发生此行为?

这在1x中的Single中是不可能的,因为没有ConnectableSingle的概念,因此也没有Single.publish的概念。您可以通过2.x和RxJava2Extensions库实现该效果:

SingleSubject<RedType> red = SingleSubject.create(); 
SingleSubject<BlueType> blue = SingleSubject.create(); 

// subscribe interested parties 
red.subscribe(...); 
blue.subscribe(...); 

Single.zip(red, blue, (r, b) -> ...).subscribe(...); 

// connect() 
blueObservable.subscribe(blue); 
redObservable.subscribe(red); 
+0

我会尽快实施此解决方案,只要我可以将此项目升级到rxjava2。不过,我很高兴用这个解决方案将它标记为正确的答案。谢谢! – dbar