实现异步写入时的读取

问题描述:

我有一个流可以同时读取和写入。 MSDN文档说,只有到达流的末尾时,我才应该从Read(buffer, offset, count)方法中返回零。实现异步写入时的读取

由于读写操作是异步的,所以如果内部缓冲区为空,读取需要等待另一个写入。

我正在努力写作方法如何表示一切都已写好。我能想到的最好的方法是Dispose()(或Close())方法将标志着写作的结束,但这感觉非常错误。

我流类实现为:

public class ContinuousStream : Stream 
{ 
    private readonly IProducerConsumerCollection<byte> _buffer; 

    public ContinuousStream() => _buffer = new ConcurrentQueue<byte>(); 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     var actualBytesRead = 0; 

     for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++) 
     { 
      buffer[i] = b; 
      actualBytesRead++; 
     } 

     return actualBytesRead; 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     for (var i = offset; i < maxByteCount; i++) 
     { 
      _buffer.TryAdd(buffer[i]); 
     } 
    } 
} 

为了澄清

同时读取和写入发生。阅读不应该等到所有内容在运行之前进行缓冲。我怎样才能向这个信号流发出将要发生的所有写作都已经发生?

+0

有许多不同的方法可以解决这个,使你的问题太宽泛。但是,你可以看看'NetworkStream'作为例子。它包装了一个'Socket',它有一个指示流结束的机制('Shutdown()')。你可以简单地在你的类中实现类似的东西,这是写代码调用的一种方法,表示它已经完成了写操作,这样你就可以知道何时从读操作中返回0字节。您也可以直接使用NetworkStream(即使用'Socket')或一些非流机制来进行通信(例如'BlockingCollection ')。 –

+0

谢谢@PeterDuniho。实际上我刚刚添加了一个'CloseWrite()'方法,它将表示不会再有任何写入。起初它感觉错了,因为它不能用于需要流的地方。我越想到实际的例子,我认为这可能是正确的解决方案。 – BanksySan

其实,我不太明白为什么你需要为这个流,不能使用IProducerConsumerCollection单独。许多这些参数都没有意义。只需使用TryTake & TryAdd而不是读取&写在第一位。

但是你可以使用这个的AutoResetEvent

public class ContinuousStream : Stream 
{ 
    private readonly AutoResetEvent _are = new AutoResetEvent(false); 
    private readonly IProducerConsumerCollection<byte> _buffer = new ConcurrentQueue<byte>(); 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     _are.WaitOne(); 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     var actualBytesRead = 0; 

     for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++) 
     { 
      buffer[i] = b; 
      actualBytesRead++; 
     } 

     return actualBytesRead; 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     for (var i = offset; i < maxByteCount; i++) 
     { 
      _buffer.TryAdd(buffer[i]); 
      _are.Set(); 
     } 
    } 
} 
+0

这是一个流,以便它可以用于接受流的地方。你说得对,它只是一个Queue的包装。 – BanksySan

+0

看看你有什么,它会需要所有的写作之前完成任何阅读可以发生? – BanksySan

+0

什么是offset BanksySan