Kotlin:使用非阻塞I/O阻塞协程
问题描述:
我正在尝试使用Kotlin协同来处理非阻塞I/O。情况如下:Kotlin:使用非阻塞I/O阻塞协程
- 从螺纹2上运行的线程1
- 等待这个数据然后消费它异步回调接收数据。
我当前的代码看起来像这样(简化为简洁起见):
private var latch = CountDownLatch(1)
private var data: Any? = null
// Async callback from non-blocking I/O
fun onReceive(data: Any) {
currentData = data
latch.countDown()
}
// Wait and consume data
fun getData(): Any? {
latch.await()
latch = CountDownLatch(1)
return currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
val data = getData()
// Consume data
}
}
}
据我了解,科特林协同程序应该能够帮助我摆脱CountDownLatch的。阅读后,所有我能想出是这样的:
// Wait and consume data
fun getData() = async(CommonPool) {
latch.await()
latch = CountDownLatch(1)
currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
runBlocking {
val data = getData().await()
// Consume data
}
}
}
}
我也试图与Pipelines,具有相似的结果。我显然不理解如何使用这些功能。
答
你没有说过,如果在onReceive()
收到的数据可以并行处理。这是主要问题。如果是的话,你可以在onReceive()
。如果不允许,请让onReceive()
的每个电话在CommonPool
上启动一个任务,而不使用任何协程。如果他们应该按顺序处理,那么最简单的方法是启动一个线程里面循环:
fun onReceive(data: Any) {
queue.put(data);
}
....
// loop in a thread
while(true) {
data = queue.take();
processData(data);
}
同样,不需要协同程序。
通常,协程是句法糖,用来表示异步程序,就好像它是同步的。我不认为你的程序是使用协程的情况。
从问题中的代码很难理解你的目标是什么。请说明什么是外部函数。 – voddan
在这种情况下,需要知道哪些API调用返回承诺以及哪些类型。请将此信息添加到问题 – voddan
@voddan我的目标是等待来自非阻塞源的数据准备就绪并对其进行处理(实际源是用户输入,但我认为这不相关,因为它可能是不同的非阻塞源)。当数据准备好处理时,非阻塞源调用'onReceive()'。数据类型是不相关的,你可以认为它最适合你的任何类型(在我的实际代码中数据类型是一个自定义类)。 – m0skit0