如何使用RxJava制作多个API请求并将它们合并?
问题描述:
我必须进行N REST API调用并合并所有这些调用的结果,或者如果至少有一个调用失败(返回错误或超时),则失败。 我想用RxJava,我有一些要求:如何使用RxJava制作多个API请求并将它们合并?
- 能够在某些情况下,以配置每个单独的API调用的重试。我的意思是,如果我有一个重试= 2,并且我提出了3个请求,则每个请求最多需要重试2次,总共最多有6个请求。
- 快速失败!如果一次API调用失败了N次(其中N是重试的配置),但如果剩余的请求还没有结束,那么它并不重要,我想返回一个错误。
如果我希望用一个线程完成所有的请求,我需要一个异步Http客户端,不会吗?
谢谢。
答
你可以使用Zip
运营商,一旦他们结束拉上所有的请求一起,检查那里,如果他们都是成功
private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;
/**
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
private Observable<String> obAsyncString() {
return Observable.just("Request1")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
private Observable<String> obAsyncString1() {
return Observable.just("Request2")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
private Observable<String> obAsyncString2() {
return Observable.just("Request3")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
在这个例子中,我们却只是CONCAT的结果,而不是这样做,你可以检查结果并在那里做你的业务逻辑。也可以考虑merge
或contact
。
看看更多的例子创造了超过1'Schedulers.newThread()'是不必要的。它为每个创建的工作人员创建一个新的线程,因此只需重复使用相同的调度程序就可以获得相同的结果。 – Kiskae