如何使用RxJava制作多个API请求并将它们合并?

问题描述:

我必须进行N REST API调用并合并所有这些调用的结果,或者如果至少有一个调用失败(返回错误或超时),则失败。 我想用RxJava,我有一些要求:如何使用RxJava制作多个API请求并将它们合并?

  • 能够在某些情况下,以配置每个单独的API调用的重试。我的意思是,如果我有一个重试= 2,并且我提出了3个请求,则每个请求最多需要重试2次,总共最多有6个请求。
  • 快速失败!如果一次API调用失败了N次(其中N是重试的配置),但如果剩余的请求还没有结束,那么它并不重要,我想返回一个错误。

如果我希望用一个线程完成所有的请求,我需要一个异步Http客户端,不会吗?

谢谢。

你可以使用Zip运营商,一旦他们结束拉上所有的请求一起,检查那里,如果他们都是成功

private Scheduler scheduler; 
private Scheduler scheduler1; 
private Scheduler scheduler2; 

/** 
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel. 
* By default Rx is not async, only if you explicitly use subscribeOn. 
*/ 
@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
      .concat(s3)) 
      .subscribe(result -> showResult("Async in:", start, result)); 
} 

private Observable<String> obAsyncString() { 
    return Observable.just("Request1") 
      .observeOn(scheduler) 
      .doOnNext(val -> { 
       System.out.println("Thread " + Thread.currentThread() 
         .getName()); 
      }) 
      .map(val -> "Hello"); 
} 

private Observable<String> obAsyncString1() { 
    return Observable.just("Request2") 
      .observeOn(scheduler1) 
      .doOnNext(val -> { 
       System.out.println("Thread " + Thread.currentThread() 
         .getName()); 
      }) 
      .map(val -> " World"); 
} 

private Observable<String> obAsyncString2() { 
    return Observable.just("Request3") 
      .observeOn(scheduler2) 
      .doOnNext(val -> { 
       System.out.println("Thread " + Thread.currentThread() 
         .getName()); 
      }) 
      .map(val -> "!"); 
} 

在这个例子中,我们却只是CONCAT的结果,而不是这样做,你可以检查结果并在那里做你的业务逻辑。也可以考虑mergecontact

您可以在这里https://github.com/politrons/reactive

+0

看看更多的例子创造了超过1'Schedulers.newThread()'是不必要的。它为每个创建的工作人员创建一个新的线程,因此只需重复使用相同的调度程序就可以获得相同的结果。 – Kiskae