rxjava2 - 在一个线程池执行任务,订阅在单个线程
问题描述:
我用下面的任务试验,以让我的头周围RxJava的简单的例子:rxjava2 - 在一个线程池执行任务,订阅在单个线程
- 给出的URL列表
- 待办事项用于在线程池
- 对于每个结果每个URL一个HTTP请求中插入一些数据到SQLite数据库(这里没有多线程)
- 块中的方法,直到它完成
所以,我想它在科特林:
val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
.observeOn(Schedulers.from(ex))
.map { Thread.currentThread().name }
.subscribe { println(it + " " + Thread.currentThread().name }
我希望它打印
pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....
但是它打印:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
任何人都可以纠正我关于如何工作的误解?为什么它不使用线程池的所有线程?我如何让我的订阅者在主线程上运行或阻塞直到完成?
答
Rx并不是指平行执行服务,因此使用Java的流API。 Rx事件是同步的,随后将流过流。 observeOn在构建流时会请求线程一次,并在该线程上逐个处理排放。
您还希望subscribe
在主线程上执行。 observeOn
切换线程,并在该线程上发生所有下游事件。如果您想切换到主线程,则必须在subscribe
之前插入另一个observeOn
。
答
使代码并行的map
块里面工作,你应该把它换到可观察到的与自己的调度:
val ex = Executors.newFixedThreadPool(10)
val scheduler = Schedulers.from(ex)
Observable.fromIterable((1..100).toList())
.flatMap {
Observable
.fromCallable { Thread.currentThread().name }
.subscribeOn(scheduler)
}
.subscribe { println(it + " " + Thread.currentThread().name) }
在这种情况下,你会看到的结果是:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...
你可以检查文章RxJava - Achieving Parallelization,给出这种行为的解释。
另外,RxJava 2.0.5引入ParallelFlowable API