用RxJava限制吞吐量
问题描述:
我现在进入的情况很难解释,所以我会写一个更简单的版本来解释这个问题。用RxJava限制吞吐量
我有一个Observable.from()
它发出文件的一个ArrayList
定义的文件序列。所有这些文件都应该上传到服务器。为此,我有一个可以完成这项工作的功能并返回Observable
。
Observable<Response> uploadFile(File file);
当我运行这段代码它变得疯狂,在Observable.from()
发出的所有文件,并在上传都在的,或至少是线程,它可以处理的最大值。
我想最多并行2个文件上传。有没有可以为我处理这个问题的操作员?
我试图缓冲,窗口和其他一些人,但他们似乎只发出两个项目一起,而不必不断两条平行的文件上传。我也尝试在上传部分设置最大线程池,但这不能用于我的情况。
这个权利应该有一个简单的操作符?我错过了什么吗?
答
我认为所有文件都是并行上传的,因为您使用的是flatMap()
,它同时执行所有转换。相反,您应该使用concatMap()
,它会一个接一个地运行一个转换。要运行两个并行上传,您需要在您的文件上调用window(2)
observable,然后像在代码中那样调用flatMap()
。
Observable<Response> responses =
files
.window(2)
.concatMap(windowFiles ->
windowFiles.flatMap(file -> uploadFile(file));
);
UPDATE:
我发现了一个更好的解决方案,这不正是你想要的东西。有flatMap()
的超载接受最大并发线程数。
Observable<Response> responses =
files
.onBackpressureBuffer()
.flatMap(index -> {
return uploadFile(file).subscribeOn(Schedulers.io());
}, 2);
这听起来很完美!我会尝试并让你知道。 –
不错的窗口操作工作现在完美!我怎样才能让窗户移动?现在,如果窗口发出文件1和文件2以供上传,则等待两者都完成。如果文件2完成且1仍在进行中,是否可以执行文件3的上载? –
我不确定使用默认的操作符是可能的,所以您可能需要编写自己的操作符。 – Michael