RxJava:如何为.zip两个可观察,然后.merge他们最终。降低聚合所有结果

问题描述:

我有以下代码:RxJava:如何为.zip两个可观察,然后.merge他们最终。降低聚合所有结果

public void foo() { 
    Long[] gData = new Long[] { 1L, 2L }; 

    rx.Observable.from(gData) 
    .concatMap(data -> { 

     rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1); 
     rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2); 


     return rx.Observable.zip(depositObs1, depositObs2, (depositObj1, depositObj2) -> { 

      depositObj1.putNumber("seat_index", data); 
      depositObj2.putNumber("seat_index", data); 

      return rx.Observable.merge(
        rx.Observable.just(depositObj1), 
        rx.Observable.just(depositObj2)); 
     }) 
    }) 
    .reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> { 

     int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue(); 
     long payout = ((GmObject) payoutObj).getNumber("payout").longValue(); 
     payoutArr.add(seatIndex, payout); 
     return payoutArr; 
    }) 
    .subscribe(results -> { 
     System.out.println(results); 
    }); 
} 

该代码使用.zip文件来发出来观测,然后将其添加'seat_index'属性并调用.merge以便使用.reduce,以便最终将所有结果聚合到一个ArrayList中。

此代码存在问题:当.reduce处理其输入时,它将其视为Observable而不是GmObject ...什么函数可以从它的Observable wrap中“提取”GmObject?

这样使用rxJava有意义吗?或者有更好的技术?

谢谢!

zip运算符将lambda作为第三个参数。这个lambda是一个2 args函数,它返回一个由args组成的结果的对象。而不是作曲结果的Observable(但是,当然,该对象可以是Observable,但它不是你想要的)。

所以在拨打zip后,您将有一个Observable<Observable<GmObject>>,但您希望Observable<GmObject>

我不认为zip运营商是您正在寻找的运营商。

public void foo() { 
    Long[] gData = new Long[] { 1L, 2L }; 

    rx.Observable.from(gData) 
    .concatMap(data -> { 

     rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1).doOnNext(obj -> obj.putNumber("seat_index", data)); 
     rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2).doOnNext(obj -> obj.putNumber("seat_index", data)); 


     return rx.Observable.merge(depositObs1, depositObs2); 
    }) 
    .reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> { 

     int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue(); 
     long payout = ((GmObject) payoutObj).getNumber("payout").longValue(); 
     payoutArr.add(seatIndex, payout); 
     return payoutArr; 
    }) 
    .subscribe(results -> System.out.println(results)); 
} 
+0

如此简单,但我无法找出自己....我从来没有用过doOnNext()... – Shvalb

+0

非常感谢您对您的帮助! – Shvalb

+0

@dwursteisen任何方式来做到这一点在Android上(没有lambda)? – Polyakoff