RxJava的生产者 - 消费者
问题描述:
我试图用rx风格的消费者数量有限(例如2)来执行长时间的操作。RxJava的生产者 - 消费者
问题是如何确保只有两位消费者同时执行其工作。
让我们有一个用户界面:
public interface Consumer{
//Take a lot of time
Observable<Result> doJob(Task task);
}
和队列类:
public class Queue {
public void enqueue(Task task){
//TODO: enqueue task and do it with limited count of Consumers
}
}
如何组织任务队列和消费的工作?
答
使用调度程序。将同时工作限制为两项任务:
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
events.flatMap(x ->
Observable.fromCallable(() -> process(x))
.subscribeOn(scheduler))
.subscribe(subscriber);