RxJava2如何与其他两个观察员共享可观察到的,然后把它合并到用户

RxJava2如何与其他两个观察员共享可观察到的,然后把它合并到用户

问题描述:

我有以下方法RxJava2如何与其他两个观察员共享可观察到的,然后把它合并到用户

Document createDocument(String url); 
List<MediaContent> getVideo(Document doc); 
List<MediaContent> getImages(Document doc); 

名单< MediaContent>将

void appendToRv(List<MediaContent> media); 

消耗我喜欢使用RxJava2,使得

CreateDocument -> getVideo -> 
          -> appendToRv 
       -> getImages -> 

(另外,视频输出应该是有序的b efore图像)。

我该怎么做呢?我试过flatMap,但它似乎只允许使用

Single<List<MediaContent>> single = 
    Single.fromCallable(() -> createDocument(url)) 

     // . ?? .. 
     // this is the part i am lost with 
     // how do i feed document to -> getVideo() and getImage() 
     // and then merge them back into the subscriber 
     // 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

single.subscribe(parseImageSubscription); 

的DisposableSingleObserver

parseImageSubscription = new DisposableSingleObserver<List<MediaContent>>() { 
    @Override 
    public void onSuccess(List<MediaContent> media) { 
     if(media!=null) { 
      appendToRv(media); 
     } 
    } 

    @Override 
    public void onError(Throwable error) { 
     doSnackBar("error loading: '" + q + "'"); 
    } 
}; 

单可观的单一方法getVideos和getImages

Single<List<MediaContent>> SingleGetImage(Document document) { 
    return Single.create(e -> { 
     List<MediaContent> result = getImage(document); 
     if (result != null) { 
      e.onSuccess(result); 
     }else { 
      e.onError(new Exception("No images found")); 
     } 
    }); 
} 

Single<List<MediaContent>> singleGetVideo(Document document) { 
    return Single.create(e -> { 
     List<MediaContent> result = getVideo(document); 
     if (result != null) { 
      e.onSuccess(result); 
     }else { 
      e.onError(new Exception("No videos found")); 
     } 
    }); 
} 

假设你想并行执行getVideosgetImages请求,您可以使用flatMap()zip(),zip wi我们将收集来自Singles的2个发射,并且您可以将2个结果合并为一个新值,这意味着您可以对视频MediaContent列表进行排序,并将其与图像MediaContent列表结合,并返回统一列表(或其他任何对象) d):

Single<List<MediaContent>> single = 
      Single.fromCallable(() -> createDocument(url)) 
        .flatMap(document -> Single.zip(singleGetVideo(document), SingleGetImage(document), 
          (videoMediaContents, imageMediaContents) -> //here you'll have the 2 results 
        //you can sort combine etc. and return unified object 
        .subscribeOn(Schedulers.io()) 
        .observeOn(AndroidSchedulers.mainThread()); 

    single.subscribe(parseImageSubscription) 

Observable.zip()可以实现它的完美。观察者将通过此方法接收合并结果。

public void zip() { 
    Observable<Integer> observable1 = Observable.just(1); 
    Observable<Integer> observable2 = Observable.just(2); 
    Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() { 
     @Override 
     public Integer call(Integer integer, Integer integer2) { 
      return integer + integer2; 
     } 
    }).subscribe(new Observer<Integer>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(Integer o) { 
      Logger.i(o.toString()); 
      //Here will print 3. 
     } 
    }); 
}