实现异步写入时的读取
问题描述:
我有一个流可以同时读取和写入。 MSDN文档说,只有到达流的末尾时,我才应该从Read(buffer, offset, count)
方法中返回零。实现异步写入时的读取
由于读写操作是异步的,所以如果内部缓冲区为空,读取需要等待另一个写入。
我正在努力写作方法如何表示一切都已写好。我能想到的最好的方法是Dispose()
(或Close()
)方法将标志着写作的结束,但这感觉非常错误。
我流类实现为:
public class ContinuousStream : Stream
{
private readonly IProducerConsumerCollection<byte> _buffer;
public ContinuousStream() => _buffer = new ConcurrentQueue<byte>();
public override int Read(byte[] buffer, int offset, int count)
{
var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
var actualBytesRead = 0;
for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++)
{
buffer[i] = b;
actualBytesRead++;
}
return actualBytesRead;
}
public override void Write(byte[] buffer, int offset, int count)
{
var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
for (var i = offset; i < maxByteCount; i++)
{
_buffer.TryAdd(buffer[i]);
}
}
}
为了澄清
同时读取和写入发生。阅读不应该等到所有内容在运行之前进行缓冲。我怎样才能向这个信号流发出将要发生的所有写作都已经发生?
答
其实,我不太明白为什么做你需要为这个流,不能使用IProducerConsumerCollection单独。许多这些参数都没有意义。只需使用TryTake & TryAdd而不是读取&写在第一位。
但是你可以使用这个的AutoResetEvent:
public class ContinuousStream : Stream
{
private readonly AutoResetEvent _are = new AutoResetEvent(false);
private readonly IProducerConsumerCollection<byte> _buffer = new ConcurrentQueue<byte>();
public override int Read(byte[] buffer, int offset, int count)
{
_are.WaitOne();
var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
var actualBytesRead = 0;
for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++)
{
buffer[i] = b;
actualBytesRead++;
}
return actualBytesRead;
}
public override void Write(byte[] buffer, int offset, int count)
{
var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
for (var i = offset; i < maxByteCount; i++)
{
_buffer.TryAdd(buffer[i]);
_are.Set();
}
}
}
有许多不同的方法可以解决这个,使你的问题太宽泛。但是,你可以看看'NetworkStream'作为例子。它包装了一个'Socket',它有一个指示流结束的机制('Shutdown()')。你可以简单地在你的类中实现类似的东西,这是写代码调用的一种方法,表示它已经完成了写操作,这样你就可以知道何时从读操作中返回0字节。您也可以直接使用NetworkStream(即使用'Socket')或一些非流机制来进行通信(例如'BlockingCollection')。 –
谢谢@PeterDuniho。实际上我刚刚添加了一个'CloseWrite()'方法,它将表示不会再有任何写入。起初它感觉错了,因为它不能用于需要流的地方。我越想到实际的例子,我认为这可能是正确的解决方案。 – BanksySan