Kotlin:使用非阻塞I/O阻塞协程

问题描述:

我正在尝试使用Kotlin协同来处理非阻塞I/O。情况如下:Kotlin:使用非阻塞I/O阻塞协程

  1. 从螺纹2上运行的线程1
  2. 等待这个数据然后消费它异步回调接收数据。

我当前的代码看起来像这样(简化为简洁起见):

private var latch = CountDownLatch(1) 
private var data: Any? = null 

// Async callback from non-blocking I/O 
fun onReceive(data: Any) { 
    currentData = data 
    latch.countDown() 
} 

// Wait and consume data 
fun getData(): Any? { 
    latch.await() 
    latch = CountDownLatch(1) 
    return currentData 
} 

fun processData() { 
    launch(CommonPool) { 
     while (true) { 
      val data = getData() 
      // Consume data     
     } 
    } 
} 

据我了解,科特林协同程序应该能够帮助我摆脱CountDownLatch的。阅读后,所有我能想出是这样的:

// Wait and consume data 
fun getData() = async(CommonPool) { 
    latch.await() 
    latch = CountDownLatch(1) 
    currentData 
} 

fun processData() { 
    launch(CommonPool) { 
     while (true) { 
      runBlocking { 
       val data = getData().await() 
       // Consume data     
      } 
     } 
    } 
} 

我也试图与Pipelines,具有相似的结果。我显然不理解如何使用这些功能。

+0

从问题中的代码很难理解你的目标是什么。请说明什么是外部函数。 – voddan

+0

在这种情况下,需要知道哪些API调用返回承诺以及哪些类型。请将此信息添加到问题 – voddan

+0

@voddan我的目标是等待来自非阻塞源的数据准备就绪并对其进行处理(实际源是用户输入,但我认为这不相关,因为它可能是不同的非阻塞源)。当数据准备好处理时,非阻塞源调用'onReceive()'。数据类型是不相关的,你可以认为它最适合你的任何类型(在我的实际代码中数据类型是一个自定义类)。 – m0skit0

你没有说过,如果在onReceive()收到的数据可以并行处理。这是主要问题。如果是的话,你可以在onReceive()。如果不允许,请让onReceive()的每个电话在CommonPool上启动一个任务,而不使用任何协程。如果他们应该按顺序处理,那么最简单的方法是启动一个线程里面循环:

fun onReceive(data: Any) { 
    queue.put(data); 
} 

.... 

// loop in a thread 
while(true) { 
    data = queue.take(); 
    processData(data); 
} 

同样,不需要协同程序。

通常,协程是句法糖,用来表示异步程序,就好像它是同步的。我不认为你的程序是使用协程的情况。