IO完成端口初始读取和双向数据

问题描述:

我有以下简化的IO完成端口服务器C++代码:IO完成端口初始读取和双向数据

int main(..) 
{ 
    startCompletionPortThreadProc(); 

    // Await client connection 

    sockaddr_in clientAddress; 
    int clientAddressSize = sizeof(clientAddress); 
    SOCKET acceptSocket = WSAAccept(serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL); 

    // Connected 

    CreateIoCompletionPort((HANDLE)acceptSocket, completionPort, 0, 0); 

    // Issue initial read 
    read(acceptSocket); 
} 


DWORD WINAPI completionPortThreadProc(LPVOID param) 
{ 
    DWORD bytesTransferred = 0; 
    ULONG_PTR completionKey = NULL; 
    LPPER_IO_DATA perIoData = NULL; 

    while(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE)) 
    { 
     if(WaitForSingleObject(exitEvent, 0) == WAIT_OBJECT_0) 
     { 
      break; 
     } 

     if(!perIoData) 
      continue; 

     if(bytesTransferred == 0) 
     { 
      //TODO 
     } 

     switch(perIoData->operation) 
     { 
      case OPERATION_READ: 
      { 
       // Bytes have been received 

       if(bytesTransferred < perIoData->WSABuf.len) 
       { 
        // Terminate string 
        perIoData->WSABuf.buf[bytesTransferred] = '\0'; 
        perIoData->WSABuf.buf[bytesTransferred+1] = '\0'; 
       } 

       // Add data to message build 
       message += std::tstring((TCHAR*)perIoData->WSABuf.buf); 

       // Perform next read 
        perIoData->WSABuf.len = sizeof(perIoData->inOutBuffer); 
        perIoData->flags = 0; 

        if(WSARecv(perIoData->socket, &(perIoData->WSABuf), 1, &bytesTransferred, &(perIoData->flags), &(perIoData->overlapped), NULL) == 0) 
        { 
         // Part message 
         continue; 
        } 

        if(WSAGetLastError() == WSA_IO_PENDING) 
        { 
         // End of message 
//TODO: Process message here 
         continue; 
        } 
       } 
      } 
      break; 

      case OPERATION_WRITE: 
      { 
       perIoData->bytesSent += bytesTransferred; 

       if(perIoData->bytesSent < perIoData->bytesToSend) 
       { 
        perIoData->WSABuf.buf = (char*)&(perIoData->inOutBuffer[perIoData->bytesSent]); 
        perIoData->WSABuf.len = (perIoData->bytesToSend - perIoData->bytesSent); 
       } 
       else 
       { 
        perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer; 
        perIoData->WSABuf.len = _tcslen(perIoData->inOutBuffer) * sizeof(TCHAR); 
        perIoData->bytesSent = 0; 
        perIoData->bytesToSend = perIoData->WSABuf.len; 
       } 

       if(perIoData->bytesToSend) 
       { 
        if(WSASend(perIoData->socket, &(perIoData->WSABuf), 1, &bytesTransferred, 0, &(perIoData->overlapped), NULL) == 0) 
         continue; 

        if(WSAGetLastError() == WSA_IO_PENDING) 
         continue; 
       } 
      } 
      break; 
     } 
    } 

    return 0; 
} 

bool SocketServer::read(SOCKET socket, HANDLE completionPort) 
{ 
    PER_IO_DATA* perIoData = new PER_IO_DATA; 
    ZeroMemory(perIoData, sizeof(PER_IO_DATA)); 

    perIoData->socket   = socket; 
    perIoData->operation   = OPERATION_READ; 
    perIoData->WSABuf.buf  = (char*)perIoData->inOutBuffer; 
    perIoData->WSABuf.len  = sizeof(perIoData->inOutBuffer); 
    perIoData->overlapped.hEvent = WSACreateEvent(); 

    DWORD bytesReceived = 0; 
    if(WSARecv(perIoData->socket, &(perIoData->WSABuf), 1, &bytesReceived, &(perIoData->flags), &(perIoData->overlapped), NULL) == SOCKET_ERROR) 
    { 
     int gle = WSAGetLastError(); 
     if(WSAGetLastError() != WSA_IO_PENDING) 
     { 
      delete perIoData; 
      return false; 
     } 
    } 

    return true; 
} 

bool SocketServer::write(SOCKET socket, std::tstring& data) 
{ 
    PER_IO_DATA* perIoData = new PER_IO_DATA; 
    ZeroMemory(perIoData, sizeof(PER_IO_DATA)); 

    perIoData->socket   = socket; 
    perIoData->operation   = OPERATION_WRITE; 
    perIoData->WSABuf.buf  = (char*)data.c_str(); 
    perIoData->WSABuf.len  = _tcslen(data.c_str()) * sizeof(TCHAR); 
    perIoData->bytesToSend  = perIoData->WSABuf.len; 
    perIoData->overlapped.hEvent = WSACreateEvent(); 

    DWORD bytesSent = 0; 
    if(WSASend(perIoData->socket, &(perIoData->WSABuf), 1, &bytesSent, 0, &(perIoData->overlapped), NULL) == SOCKET_ERROR) 
    { 
     if(WSAGetLastError() != WSA_IO_PENDING) 
     { 
      delete perIoData; 
      return false; 
     } 
    } 

    return true; 
} 

1)第一个问题我有与初始读取。

在客户端连接(接受)上,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError()是WSA_IO_PENDING,并且读取方法返回。

当客户端发送数据时,线程仍然停留在GetQueuedCompletionStatus调用中(因为我认为我需要另一个WSARecv调用?)。

我应该继续循环读取方法,直到数据到达?这似乎并不合逻辑,我认为通过发布初始读GetQueuedCompletionStatus将在数据到达时完成。

2)我需要双向读写数据而无需确认。因此我也创建了一个IOCP线程的客户端。实际上可以用完成端口来做到这一点,还是必须在写入之后进行读取?

对于感觉像基本问题那样感到抱歉,但在拖网和构建IOCP示例之后,我仍然无法回答这些问题。

非常感谢提前。

在客户端连接(接受)上,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError()是WSA_IO_PENDING,并且读取方法返回。

这是正常行为。

当客户端发送数据时,线程仍然停留在GetQueuedCompletionStatus调用中(因为我认为我需要另一个WSARecv调用?)。

不,你不需要另一个电话。如果它卡住了,那么你没有正确地将读取与I/O完成端口相关联。

我应该继续循环读取方法,直到数据到达?

不需要。您需要一次拨打WSARecv()进行初始阅读。 WSA_IO_PENDING错误表示读数据正在等待数据,并在数据实际到达时向I/O完成端口发送信号。不要致电WSARecv()(或任何其他阅读功能),直到该信号实际到达。然后您可以再次拨打WSARecv()以等待更多数据。重复,直到套接字断开连接。

我认为通过发布初始读取GetQueuedCompletionStatus可以在数据到达时完成。

这正是应该发生的事情。

2)我需要在没有确认的情况下双向读写数据。因此我也创建了一个IOCP线程的客户端。实际上是否可以用完成端口来做到这一点

是的。阅读和写作是分开的操作,它们不相互依赖。

确实读取后必须写入?

如果您的协议不需要它,不需要。

现在,就是说,你的代码有一些问题。

在一个小问题上,WSAAccept()是同步的,您应该考虑使用AcceptEx()来代替,因此它可以使用相同的I/O完成端口来报告新的连接。

但更重要的是,当挂起的I/O操作失败,GetQueuedCompletionStatus()返回FALSE,返回LPOVERLAPPED指针将非NULL,以及GetLastError()将报告为什么I/O操作失败。但是,如果GetQueuedCompletionStatus()本身失败,则返回的LPOVERLAPPED指针将为空,并且GetLastError()将报告为什么GetQueuedCompletionStatus()失败。这种差异在documentation中有明确说明,但您的while循环未考虑它。使用do..while循环,而不是和行为根据LPOVERLAPPED指针:

DWORD WINAPI completionPortThreadProc(LPVOID param) 
{ 
    DWORD bytesTransferred = 0; 
    ULONG_PTR completionKey = NULL; 
    LPPER_IO_DATA perIoData = NULL; 

    do 
    { 
     if(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE)) 
     { 
      // I/O success, handle perIoData based on completionKey as needed... 
     } 
     else if(perIoData) 
     { 
      // I/O failed, handle perIoData based on completionKey as needed... 
     } 
     else 
     { 
      // GetQueuedCompletionStatus() failure... 
      break; 
     }  
    } 
    while(WaitForSingleObject(exitEvent, 0) == WAIT_TIMEOUT); 

    return 0; 
} 

在一个侧面说明,而不是使用一个事件对象发出信号时completionPortThreadProc()应该退出,可以考虑使用PostQueuedCompletionionStatus()而不是终止completionKey邮寄到我/ O完成端口,那么你的循环可以查找值:

DWORD WINAPI completionPortThreadProc(LPVOID param) 
{ 
    DWORD bytesTransferred = 0; 
    ULONG_PTR completionKey = NULL; 
    LPPER_IO_DATA perIoData = NULL; 

    do 
    { 
     if(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE)) 
     { 
      if(completionKey == MyTerminateKey) 
       break; 

      if(completionKey == MySocketIOKey) 
      { 
       // I/O success, handle perIoData as needed... 
      } 
     } 
     else if(perIoData) 
     { 
      // I/O failed, handle perIoData based on completionKey as needed... 
     } 
     else 
     { 
      // GetQueuedCompletionStatus() failure... 
      break; 
     }  
    } 
    while(true); 

    return 0; 
} 

CreateIoCompletionPort((HANDLE)acceptSocket, completionPort, MySocketIOKey, 0); 

PostQueuedCompletionStatus(completionPort, 0, MyTerminateKey, NULL); 
+0

嗨雷米, 你长的解释非常感谢,这是非常大加赞赏,我要疯了这里。我会通过您的意见并回报!我从其他例子中取得了相当数量的代码。 – CAM79

+0

嗨,雷米,好吧,新循环是关键,并专门检查'if(perIoData)'中的GetLastError。 I/O由于WSA_OPERATION_ABORTED失败而未能完成。我有一个接受线程发布了阅读,然后结束了。我原以为它仍然可以工作,但显然我的设计是错误的。 非常感谢您的帮助,我会在您的新设计中使用您的想法。 希望您的意见也可以帮助其他人,因为很多示例似乎都使用我的代码。 – CAM79

+0

当一个线程终止时,它所启动的任何I/O操作都将自动中止。你应该在一个线程内的一个循环中调用'WSAAccept()',该线程至少在监听套接字的生命周期内存在(或者将客户端接受移动到IO完成端口并发出来自这样的线程的初始接受) Os不会中止.. –