如何使用RxJava2和Retrofit2创建并行多个非阻塞服务请求
问题描述:
我需要一些帮助来实现使用RxJava2的并行异步调用& Retrofit2。 我的要求是;如何使用RxJava2和Retrofit2创建并行多个非阻塞服务请求
1)我有多个保险公司(现在我只需要两个),我需要发送多个使用该保险公司名称的并行请求。
2)如果它们中的任何一个给服务器错误,那么其余的请求不应该被阻塞。
以下是我到现在为止所尝试的;
ArrayList<String> arrInsurer = new ArrayList<>();
arrInsurer.add(AppConstant.HDFC);
arrInsurer.add(AppConstant.ITGI);
RequestInterface service = getService(ServiceAPI.CAR_BASE_URL);
for (String insurerName : arrInsurer) {
service.viewQuote(Utils.getPrefQuoteId(QuoteListActivity.this), insurerName)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ViewQuoteResDTO>() {
@Override
public void accept(@NonNull ViewQuoteResDTO viewQuoteResDTO) throws Exception {
Log.e("Demo", viewQuoteResDTO.getPremiumData().getIDV()+"");
updateList();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e("Demo", throwable.getMessage());
}
});
}
private RequestInterface getService(String baseUrl) {
Gson gson = new GsonBuilder()
.setLenient()
.create();
return new Retrofit.Builder()
.baseUrl(baseUrl)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create(gson))
.build().create(RequestInterface.class);
}
现在,上面的代码只有在两个请求都能成功响应的情况下才能正常工作。但是当任何请求作为内部服务器错误发出响应时,请求的其余部分也会被阻塞。
下面的任何一个请求给出的日志错误我得到失败响应;
E/Demo: HTTP 500 Aww Snap, Some thing happened at server. Please try back again later.
E/Demo: unexpected end of stream on Connection{100.xxx.xxx.xx:portNo, [email protected] hostAddress=/100.xxx.xxx.xx:portNo cipherSuite=none protocol=http/1.1}
如何处理这个错误?
答
我想像任何其他Rx相关的问题,这有多个答案。我会在我们的应用程序中使用我的应用程序,并解决这个用例。希望能帮助到你。
短版本 - 这依赖于mergeDelayError
。检查出here
为什么merge
?因为与concat
不同,它将并行执行观察值。为什么mergeDelayError
?它延迟了错误...本质上它会执行每个可观察的事物并在所有事情完成时传递错误。这可以确保即使出现一个或多个错误,其他错误仍会被执行。
你必须小心一些细节。事件顺序不再保留,这意味着merge
运算符可能会交织一些可观察事件(鉴于您以前如何做事,这不应该成为问题)。据我所知,即使多个可疑事件失败,你只会得到一个onError
的电话。如果这两个都ok,那么你可以尝试以下方法:
List<Observable<ViewQuoteResDTO>> observables = new ArrayList<>();
for (String insurerName : arrInsurer) {
observables.add(service.viewQuote(
Utils.getPrefQuoteId(QuoteListActivity.this), insurerName));
}
Observable.mergeDelayError(observables)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* subscriber calls if you need them */);
的想法是创建一个你要运行,然后使用mergeDelayError
触发所有这些观测。