RxJava的生产者 - 消费者

问题描述:

我试图用rx风格的消费者数量有限(例如2)来执行长时间的操作。RxJava的生产者 - 消费者

问题是如何确保只有两位消费者同时执行其工作。

让我们有一个用户界面:

public interface Consumer{ 
    //Take a lot of time 
    Observable<Result> doJob(Task task); 

} 

和队列类:

public class Queue { 
    public void enqueue(Task task){ 
     //TODO: enqueue task and do it with limited count of Consumers 
    } 
} 

如何组织任务队列和消费的工作?

使用调度程序。将同时工作限制为两项任务:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2)); 
events.flatMap(x -> 
    Observable.fromCallable(() -> process(x)) 
       .subscribeOn(scheduler)) 
    .subscribe(subscriber);