这种方法比仅仅在Task.Run中触发stream.Read()更好吗?

问题描述:

编辑:我没有最终根据discussionStephen Cleary做这种方法。如果你对我的做法感兴趣,请看下面我的answer这种方法比仅仅在Task.Run中触发stream.Read()更好吗?

我正在寻找一种方法来从NetworkStream超时异步读取。当然,问题在于无法取消NetworkStream上的ReadAsync(),因为它只是忽略了CancellationToken。我读了一个答案,建议关闭Token取消流,但在我的情况下,这不是一个选项,因为Tcp连接必须保持打开状态。所以我想出了下面的代码,但我想知道这是否比做一个

Task.Run(() => stream.Read(buffer, offset, count) 

只是阻止一个线程。

public static class TcpStreamExtension 
{ 
    public static async Task<int> ReadAsyncWithTimeout(this NetworkStream stream, byte[] buffer, int offset, int count) 
    { 
     CancellationTokenSource cts = new CancellationTokenSource(); 
     bool keepTrying = true; 
     Timer timer = new Timer(stream.ReadTimeout); 
     timer.Elapsed += new ElapsedEventHandler((sender, args) => stopTrying(sender, args, cts, out keepTrying)); 
     timer.Start(); 

     try 
     { 
      if (stream.CanRead) 
      { 
       while (true) 
       { 
        if (stream.DataAvailable) 
        { 
         return await stream.ReadAsync(buffer, offset, count, cts.Token).ConfigureAwait(false); 
        } 

        if (keepTrying) 
        { 
         await Task.Delay(300, cts.Token).ConfigureAwait(false); 
        } 
        else 
        { 
         cts.Dispose(); 
         timer.Dispose(); 
         throw new IOException(); 
        } 
       } 
      } 
     } 
     catch (TaskCanceledException tce) 
     { 
      // do nothing 
     } 
     finally 
     { 
      cts.Dispose(); 
      timer.Dispose(); 
     } 
     if (stream.DataAvailable) 
     { 
      return await stream.ReadAsync(buffer, offset, count).ConfigureAwait(false); 
     } 

     throw new IOException(); 
    } 

    private static void stopTrying(object sender, ElapsedEventArgs args, CancellationTokenSource cts, out bool keepTrying) 
    { 
     keepTrying = false; 
     cts.Cancel(); 
    } 

} 

的应用具有潜在的能够与几千个端点进行通信和我想要的方式,也不会*一堆线程,因为大多数它的工作是IO来创建它。另外,超时的情况应该是

+3

您可以在不关闭TCP连接的情况下关闭NetworkStream,如果您从TCPClient获取流但不使用它,请获取套接字并手动创建具有以下超载的流:'public NetworkStream(Socket socket,\t bool ownsSocket)'with 'ownSocket = false',这样就可以使TCP连接保持打开状态,并且可以根据需要创建尽可能多的数据流。 – Gusman

+0

看来,如果stream.ReadAsync引发异常,cts和timer将不会被处置。 –

+0

备注:确保在取消读取操作后可靠地找到下一条消息的开头,因为您不知道蒸汽的当前状态。 –

基础上discussionStephen Cleary和他的建议,我把在我如何实现这个第二的外观和我不每读取超时的方法去,但它保持,只要打开作为TcpClient的是开放的,然后从不同的代码控制超时。我用Task.Run(() => beginReading());所以当然会在使用线程池,但我认为它是好的,因为大多数的线程会被击中await,因此可以*

这里的时间是我实现:

private readonly Queue<byte> bigBuffer = new Queue<byte>(); 
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0, 1); 

// This is called in a Task.Run() 
private async Task beginReading() 
{ 
    byte[] buffer = new byte[1024]; 

    using (_shutdownToken.Register(() => m_TcpStream.Close())) 
    { 
     while (!_shutdownToken.IsCancellationRequested) 
     { 
      try 
      { 
       int bytesReceived = 0; 
       if (m_TcpStream.CanRead) 
       { 
        bytesReceived = await m_TcpStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false); 
       } 
       else 
       { 
        // in case the stream is not working, wait a little bit 
        await Task.Delay(3000, _shutdownToken); 
       } 

       if (bytesReceived > 0) 
       { 
        for (int i = 0; i < bytesReceived; i++) 
        { 
         bigBuffer.Enqueue(buffer[i]); 
        } 

        _signal.Release(); 
        Array.Clear(buffer, 0, buffer.Length); 

       } 
      } 
      catch (Exception e) 
      { 
       LoggingService.Log(e); 
      } 
     } 
    } 
} 

private async Task<int> ReadAsyncWithTimeout(byte[] buffer, int offset, int count) 
{ 
    int bytesToBeRead = 0; 

    if (!m_TcpClient.Connected) 
    { 
     throw new ObjectDisposedException("Socket is not connected"); 
    } 

    if (bigBuffer.Count > 0) 
    { 
     bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count; 

     for (int i = offset; i < bytesToBeRead; i++) 
     { 
      buffer[i] = bigBuffer.Dequeue(); 
     } 

     // Clear up the semaphore in case of a race condition where the writer just wrote and then this came in and read it without waiting 
     if (_signal.CurrentCount > 0) 
      await _signal.WaitAsync(BabelfishConst.TCPIP_READ_TIME_OUT_IN_MS, _shutdownToken).ConfigureAwait(false); 

     return bytesToBeRead; 
    } 

    // In case there is nothing in the Q, wait up to timeout to get data from the writer 
    await _signal.WaitAsync(15000, _shutdownToken).ConfigureAwait(false); 

    // read again in case the semaphore was signaled by an Enqueue 
    if (bigBuffer.Count > 0) 
    { 
     bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count; 

     for (int i = offset; i < bytesToBeRead; i++) 
     { 
      buffer[i] = bigBuffer.Dequeue(); 
     } 


     return bytesToBeRead; 
    } 

    // This is because the synchronous NetworkStream Read() method throws this exception when it times out 
    throw new IOException(); 
} 

对于类似的用例,我使用了一个Task.Delay()任务超时。 是这样的:

public static async Task<int> ReadAsync(
     NetworkStream stream, byte[] buffer, int offset, int count, int timeoutMillis) 
{ 
     if (timeoutMillis < 0) throw new ArgumentException(nameof(timeoutMillis)); 
     else if (timeoutMillis == 0) 
     { 
      // No timeout 
      return await stream.ReadAsync(buffer, offset, count); 
     } 

     var cts = new CancellationTokenSource(); 
     var readTask = stream.ReadAsync(buffer, offset, count, cts.Token); 
     var timerTask = Task.Delay(timeoutMillis, cts.Token); 

     var finishedTask = await Task.WhenAny(readTask, timerTask); 
     var hasTimeout = ReferenceEquals(timerTask, finishedTask); 
     // Cancel the timer which might be still running 
     cts.Cancel(); 
     cts.Dispose(); 

     if (hasTimeout) throw new TimeoutException(); 
     // No timeout occured 
     return readTask.Result; 
} 
+0

我最初采用相同的方法,但后来我发现有些消息会丢失。我认为发生了什么是readAsync调用将保持打开并泄漏消息。 –

+0

一旦进入第一次超时并取消读取,您基本上不再保证流的状态。可能已经有一些数据已经被使用,或者可能没有。您可以做的唯一事情就是在该层发生任何超时后关闭网络流。 – Matthias247

+0

其实我试过了Gusman在问题的评论中提出的建议,但似乎也没有效果。在没有客户端的情况下关闭流也会导致数据丢失。 –

首先,你正在试图做的是根本性的缺陷。您应该始终从开放的TCP/IP流中读取 - 只要一次读取某些数据,就将其传递并开始下一次读取。

所以,我的第一个建议是不是需要首先取消可读的阅读。相反,始终保持阅读。同样,使用DataAvailable是一种代码异味。

更多的解释...

没有“强制执行”消除对非取消代码的好方法。关闭TCP/IP套接字是最简单和最干净的方法。您的现有解决方案将无法工作,因为ReadAsync忽略了CancellationToken。所以它没有比使用CancellationToken没有定时器更好。如果ReadAsync忽略CancellationToken,您唯一真正的选择是关闭套接字。任何其他解决方案都可能导致“丢失数据” - 从套接字中读取但丢弃的数据。

+0

这就是我正在寻找的那种批评。我会尝试@Gusman在评论中提出的建议,并将报告如何进行。 –

+1

@SvetAngelov:我不确定这是否会奏效。这取决于'NetworkStream'如何从'Socket'中读取。许多流都有某种缓冲区,如果流关闭并且套接字被重用,这会导致数据丢失。 –

+0

我希望我有办法解决取消问题,但不幸的是,我们所连接的端点,一个不会改变的遗留设备,希望我们尝试连接一段时间,如果我们没有得到答复,我们要取消,然后等一会再试。我会看看我是否可以做任何事情来解决这个问题 –