异步等待另一个请求没有阻止

问题描述:

我有一个websocket应用程序,这是一个OWIN中间件。当一个请求到来时,WebSocket的处理程序的一个新的实例启动,然后等待在这样的循环icoming消息:异步等待另一个请求没有阻止

var buffer = new byte[1024*64]; 
Tuple<ArraySegment<byte>, WebSocketMessageType> received; 
do 
{ 
    received = await _webSocket.ReceiveMessage(buffer, _cancellationToken.Token); 
    if (received.Item1.Count > 0 && someConditionForCallingDoSomething) 
    { 
     await DoSomething(received.Item1); 
    } 
    else if(isAnswer) 
    { 
     QueueAnswer(received.Item1); 
    } 
} while (received.Item2 != WebSocketMessageType.Close); 

任务从_webSocket.ReceiveMessage返回数据可用时将完成。

DoSomething处理它的数据,然后通过websocket连接发送一些东西。然后它应该通过websocket连接等待消息。处理完这条消息后,它应该做一些工作并返回(任务)。 也许这个小图解释了它somhow简单:

_______________ 
| DoSomething | 
|-------------| 
| Work ---------> Send WS message 
|    | 
|  ??  | 
|    | 
| More Work <------- Receive WS message 
| |   | 
| V   | 
| return; | 
|_____________| 

Do some work with the data 
      | 
      V 
    Send a message 
      | 
      V 
    Wait for an answer 
      | 
      V 
    Process answer 
      | 
      V 
     finish 

我试图等待一个答案:

var answerCancel = new CancellationTokenSource(); 
answerCancel.CancelAfter(30 * 1000); 

var answer = await Task.Run(async() => 
    { 
     string tmpAnswer = null; 

     while (!_concurrentAnswerDict.TryGetValue(someKey, out tmpAnswer)) { 
      await Task.Delay(150, answerCancel.Token); 
     } 

     return tmpAnswer; 
    }, answerCancel.Token); 

但这似乎阻挡,直到任务被取消。当我调试程序时,我会在30秒后看到QueueAnswer的调用。我认为,Task.Run将在新线程中运行该功能,但似乎没有。从Task.Run被阻止的角度看,我认为它不起作用是合乎逻辑的,因为我等待执行DoSomething,因此接收新消息也会被阻止。

我的问题是:如何实现这样的行为?在完成之前,如何让DoSomething等待另一个websocket消息?

预先感谢您的每一丝

卢卡斯

+0

真的不清楚最后一段代码是在哪里进来的。那是'DoSomething'吗?如果你能提供一个[mcve]而不是点点滴滴,那将是有用的。 –

+2

'等待Task.Run(...)'通常是某种形式的误解的警告标志。你正在一个完全可用的线程上运行代码。你将把工作推到一个新的Task中,然后*放弃*你正在使用的线程,直到完成新的Task任务。 –

+0

'如何使DoSomething在完成之前等待另一个websocket消息?它是否总是等待下一条消息?如果是这样,'DoSomething'可以调用'_webSocket.ReceiveMessage'。 –

首先,我建议使用SignalR,因为他们处理了很多对你这个坚硬的东西。但如果你想自己动手,请继续阅读...

此外,我假设“做工作”和“回答”消息可以以任何顺序到达同一个网络套接字,并且您是使用_concurrentAnswerDict来协调来自DoSomething的传出“问题”消息与传入的“应答”消息。

在这种情况下,您将需要一个独立于DoSomething的“websocket阅读器”任务;你不能让你的读者awaitDoSomething,因为这会阻止阅读答案。我认为这是你遇到的主要问题。

这是一种罕见的情况,它可能被接受而不是await一项任务。假设DoSomething能捕捉到自己的异常和处理日志记录和诸如此类的东西,那么我们就可以把它当作一个独立的“主”,而忽略它返回的任务:

var buffer = new byte[1024*64]; 
Tuple<ArraySegment<byte>, WebSocketMessageType> received; 
do 
{ 
    received = await _webSocket.ReceiveMessage(buffer, _cancellationToken.Token); 
    if (received.Item1.Count > 0 && someConditionForCallingDoSomething) 
    { 
    var _ = DoSomething(received.Item1); 
    } 
    else if(isAnswer) 
    { 
    QueueAnswer(received.Item1); 
    } 
} while (received.Item2 != WebSocketMessageType.Close); 

,并应允许QueueAnswer运行而DoSomething尚未完成。

我想,Task.Run会在新线程中运行该函数,但它似乎没有。从Task.Run阻止的角度来看,我认为它不起作用是合乎逻辑的,因为我在等待DoSomething的执行,因此接收新消息也会被阻止。

Task.Run在另一个线程中运行。但是DoSomething(异步)正在等待它完成,并且在读取下一条消息之前,读取循环(异步)等待DoSomething完成。

其他说明:

while (!_concurrentAnswerDict.TryGetValue(someKey, out tmpAnswer)) { 
    await Task.Delay(150, answerCancel.Token); 
} 

这似乎很奇怪我。我会建议使用密钥字典TaskCompletionSource<Answer>而不是Answer。然后,QueueAnswer将调用TaskCompletionSource<Answer>.SetResult,并且此代码将仅等待TaskCompletionSource<Answer>.Task(如果需要超时则与Task.Delay一起)。

+0

省略等待使其工作。感谢您的详细解答! :) –