IO完成端口初始读取和双向数据
我有以下简化的IO完成端口服务器C++代码:IO完成端口初始读取和双向数据
int main(..)
{
startCompletionPortThreadProc();
// Await client connection
sockaddr_in clientAddress;
int clientAddressSize = sizeof(clientAddress);
SOCKET acceptSocket = WSAAccept(serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL);
// Connected
CreateIoCompletionPort((HANDLE)acceptSocket, completionPort, 0, 0);
// Issue initial read
read(acceptSocket);
}
DWORD WINAPI completionPortThreadProc(LPVOID param)
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
while(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE))
{
if(WaitForSingleObject(exitEvent, 0) == WAIT_OBJECT_0)
{
break;
}
if(!perIoData)
continue;
if(bytesTransferred == 0)
{
//TODO
}
switch(perIoData->operation)
{
case OPERATION_READ:
{
// Bytes have been received
if(bytesTransferred < perIoData->WSABuf.len)
{
// Terminate string
perIoData->WSABuf.buf[bytesTransferred] = '\0';
perIoData->WSABuf.buf[bytesTransferred+1] = '\0';
}
// Add data to message build
message += std::tstring((TCHAR*)perIoData->WSABuf.buf);
// Perform next read
perIoData->WSABuf.len = sizeof(perIoData->inOutBuffer);
perIoData->flags = 0;
if(WSARecv(perIoData->socket, &(perIoData->WSABuf), 1, &bytesTransferred, &(perIoData->flags), &(perIoData->overlapped), NULL) == 0)
{
// Part message
continue;
}
if(WSAGetLastError() == WSA_IO_PENDING)
{
// End of message
//TODO: Process message here
continue;
}
}
}
break;
case OPERATION_WRITE:
{
perIoData->bytesSent += bytesTransferred;
if(perIoData->bytesSent < perIoData->bytesToSend)
{
perIoData->WSABuf.buf = (char*)&(perIoData->inOutBuffer[perIoData->bytesSent]);
perIoData->WSABuf.len = (perIoData->bytesToSend - perIoData->bytesSent);
}
else
{
perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer;
perIoData->WSABuf.len = _tcslen(perIoData->inOutBuffer) * sizeof(TCHAR);
perIoData->bytesSent = 0;
perIoData->bytesToSend = perIoData->WSABuf.len;
}
if(perIoData->bytesToSend)
{
if(WSASend(perIoData->socket, &(perIoData->WSABuf), 1, &bytesTransferred, 0, &(perIoData->overlapped), NULL) == 0)
continue;
if(WSAGetLastError() == WSA_IO_PENDING)
continue;
}
}
break;
}
}
return 0;
}
bool SocketServer::read(SOCKET socket, HANDLE completionPort)
{
PER_IO_DATA* perIoData = new PER_IO_DATA;
ZeroMemory(perIoData, sizeof(PER_IO_DATA));
perIoData->socket = socket;
perIoData->operation = OPERATION_READ;
perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer;
perIoData->WSABuf.len = sizeof(perIoData->inOutBuffer);
perIoData->overlapped.hEvent = WSACreateEvent();
DWORD bytesReceived = 0;
if(WSARecv(perIoData->socket, &(perIoData->WSABuf), 1, &bytesReceived, &(perIoData->flags), &(perIoData->overlapped), NULL) == SOCKET_ERROR)
{
int gle = WSAGetLastError();
if(WSAGetLastError() != WSA_IO_PENDING)
{
delete perIoData;
return false;
}
}
return true;
}
bool SocketServer::write(SOCKET socket, std::tstring& data)
{
PER_IO_DATA* perIoData = new PER_IO_DATA;
ZeroMemory(perIoData, sizeof(PER_IO_DATA));
perIoData->socket = socket;
perIoData->operation = OPERATION_WRITE;
perIoData->WSABuf.buf = (char*)data.c_str();
perIoData->WSABuf.len = _tcslen(data.c_str()) * sizeof(TCHAR);
perIoData->bytesToSend = perIoData->WSABuf.len;
perIoData->overlapped.hEvent = WSACreateEvent();
DWORD bytesSent = 0;
if(WSASend(perIoData->socket, &(perIoData->WSABuf), 1, &bytesSent, 0, &(perIoData->overlapped), NULL) == SOCKET_ERROR)
{
if(WSAGetLastError() != WSA_IO_PENDING)
{
delete perIoData;
return false;
}
}
return true;
}
1)第一个问题我有与初始读取。
在客户端连接(接受)上,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError()是WSA_IO_PENDING,并且读取方法返回。
当客户端发送数据时,线程仍然停留在GetQueuedCompletionStatus调用中(因为我认为我需要另一个WSARecv调用?)。
我应该继续循环读取方法,直到数据到达?这似乎并不合逻辑,我认为通过发布初始读GetQueuedCompletionStatus将在数据到达时完成。
2)我需要双向读写数据而无需确认。因此我也创建了一个IOCP线程的客户端。实际上可以用完成端口来做到这一点,还是必须在写入之后进行读取?
对于感觉像基本问题那样感到抱歉,但在拖网和构建IOCP示例之后,我仍然无法回答这些问题。
非常感谢提前。
在客户端连接(接受)上,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError()是WSA_IO_PENDING,并且读取方法返回。
这是正常行为。
当客户端发送数据时,线程仍然停留在GetQueuedCompletionStatus调用中(因为我认为我需要另一个WSARecv调用?)。
不,你不需要另一个电话。如果它卡住了,那么你没有正确地将读取与I/O完成端口相关联。
我应该继续循环读取方法,直到数据到达?
不需要。您需要一次拨打WSARecv()
进行初始阅读。 WSA_IO_PENDING
错误表示读数据正在等待数据,并在数据实际到达时向I/O完成端口发送信号。不要致电WSARecv()
(或任何其他阅读功能),直到该信号实际到达。然后您可以再次拨打WSARecv()
以等待更多数据。重复,直到套接字断开连接。
我认为通过发布初始读取GetQueuedCompletionStatus可以在数据到达时完成。
这正是应该发生的事情。
2)我需要在没有确认的情况下双向读写数据。因此我也创建了一个IOCP线程的客户端。实际上是否可以用完成端口来做到这一点
是的。阅读和写作是分开的操作,它们不相互依赖。
确实读取后必须写入?
如果您的协议不需要它,不需要。
现在,就是说,你的代码有一些问题。
在一个小问题上,WSAAccept()
是同步的,您应该考虑使用AcceptEx()
来代替,因此它可以使用相同的I/O完成端口来报告新的连接。
但更重要的是,当挂起的I/O操作失败,GetQueuedCompletionStatus()
返回FALSE,返回LPOVERLAPPED
指针将非NULL,以及GetLastError()
将报告为什么I/O操作失败。但是,如果GetQueuedCompletionStatus()
本身失败,则返回的LPOVERLAPPED
指针将为空,并且GetLastError()
将报告为什么GetQueuedCompletionStatus()
失败。这种差异在documentation中有明确说明,但您的while
循环未考虑它。使用do..while
循环,而不是和行为根据LPOVERLAPPED
指针:
DWORD WINAPI completionPortThreadProc(LPVOID param)
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
do
{
if(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE))
{
// I/O success, handle perIoData based on completionKey as needed...
}
else if(perIoData)
{
// I/O failed, handle perIoData based on completionKey as needed...
}
else
{
// GetQueuedCompletionStatus() failure...
break;
}
}
while(WaitForSingleObject(exitEvent, 0) == WAIT_TIMEOUT);
return 0;
}
在一个侧面说明,而不是使用一个事件对象发出信号时completionPortThreadProc()
应该退出,可以考虑使用PostQueuedCompletionionStatus()
而不是终止completionKey邮寄到我/ O完成端口,那么你的循环可以查找值:
DWORD WINAPI completionPortThreadProc(LPVOID param)
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
do
{
if(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE))
{
if(completionKey == MyTerminateKey)
break;
if(completionKey == MySocketIOKey)
{
// I/O success, handle perIoData as needed...
}
}
else if(perIoData)
{
// I/O failed, handle perIoData based on completionKey as needed...
}
else
{
// GetQueuedCompletionStatus() failure...
break;
}
}
while(true);
return 0;
}
CreateIoCompletionPort((HANDLE)acceptSocket, completionPort, MySocketIOKey, 0);
PostQueuedCompletionStatus(completionPort, 0, MyTerminateKey, NULL);
嗨雷米, 你长的解释非常感谢,这是非常大加赞赏,我要疯了这里。我会通过您的意见并回报!我从其他例子中取得了相当数量的代码。 – CAM79
嗨,雷米,好吧,新循环是关键,并专门检查'if(perIoData)'中的GetLastError。 I/O由于WSA_OPERATION_ABORTED失败而未能完成。我有一个接受线程发布了阅读,然后结束了。我原以为它仍然可以工作,但显然我的设计是错误的。 非常感谢您的帮助,我会在您的新设计中使用您的想法。 希望您的意见也可以帮助其他人,因为很多示例似乎都使用我的代码。 – CAM79
当一个线程终止时,它所启动的任何I/O操作都将自动中止。你应该在一个线程内的一个循环中调用'WSAAccept()',该线程至少在监听套接字的生命周期内存在(或者将客户端接受移动到IO完成端口并发出来自这样的线程的初始接受) Os不会中止.. –