用RxJava限制吞吐量

用RxJava限制吞吐量

问题描述:

我现在进入的情况很难解释,所以我会写一个更简单的版本来解释这个问题。用RxJava限制吞吐量

我有一个Observable.from()它发出文件的一个ArrayList定义的文件序列。所有这些文件都应该上传到服务器。为此,我有一个可以完成这项工作的功能并返回Observable

Observable<Response> uploadFile(File file); 

当我运行这段代码它变得疯狂,在Observable.from()发出的所有文件,并在上传都在的,或至少是线程,它可以处理的最大值。

我想最多并行2个文件上传。有没有可以为我处理这个问题的操作员?

我试图缓冲窗口和其他一些人,但他们似乎只发出两个项目一起,而不必不断两条平行的文件上传。我也尝试在上传部分设置最大线程池,但这不能用于我的情况。

这个权利应该有一个简单的操作符?我错过了什么吗?

我认为所有文件都是并行上传的,因为您使用的是flatMap(),它同时执行所有转换。相反,您应该使用concatMap(),它会一个接一个地运行一个转换。要运行两个并行上传,您需要在您的文件上调用window(2) observable,然后像在代码中那样调用flatMap()

Observable<Response> responses = 
    files 
     .window(2) 
     .concatMap(windowFiles -> 
     windowFiles.flatMap(file -> uploadFile(file)); 
    ); 

UPDATE

我发现了一个更好的解决方案,这不正是你想要的东西。有flatMap()的超载接受最大并发线程数。

Observable<Response> responses = 
    files 
     .onBackpressureBuffer() 
     .flatMap(index -> { 
     return uploadFile(file).subscribeOn(Schedulers.io()); 
     }, 2); 
+0

这听起来很完美!我会尝试并让你知道。 –

+0

不错的窗口操作工作现在完美!我怎样才能让窗户移动?现在,如果窗口发出文件1和文件2以供上传,则等待两者都完成。如果文件2完成且1仍在进行中,是否可以执行文件3的上载? –

+0

我不确定使用默认的操作符是可能的,所以您可能需要编写自己的操作符。 – Michael