2016-06-07 41 views
4

我有以下簡化的IO完成端口服務器C++代碼:IO完成端口初始讀取和雙向數據

int main(..) 
{ 
    startCompletionPortThreadProc(); 

    // Await client connection 

    sockaddr_in clientAddress; 
    int clientAddressSize = sizeof(clientAddress); 
    SOCKET acceptSocket = WSAAccept(serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL); 

    // Connected 

    CreateIoCompletionPort((HANDLE)acceptSocket, completionPort, 0, 0); 

    // Issue initial read 
    read(acceptSocket); 
} 


DWORD WINAPI completionPortThreadProc(LPVOID param) 
{ 
    DWORD bytesTransferred = 0; 
    ULONG_PTR completionKey = NULL; 
    LPPER_IO_DATA perIoData = NULL; 

    while(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE)) 
    { 
     if(WaitForSingleObject(exitEvent, 0) == WAIT_OBJECT_0) 
     { 
      break; 
     } 

     if(!perIoData) 
      continue; 

     if(bytesTransferred == 0) 
     { 
      //TODO 
     } 

     switch(perIoData->operation) 
     { 
      case OPERATION_READ: 
      { 
       // Bytes have been received 

       if(bytesTransferred < perIoData->WSABuf.len) 
       { 
        // Terminate string 
        perIoData->WSABuf.buf[bytesTransferred] = '\0'; 
        perIoData->WSABuf.buf[bytesTransferred+1] = '\0'; 
       } 

       // Add data to message build 
       message += std::tstring((TCHAR*)perIoData->WSABuf.buf); 

       // Perform next read 
        perIoData->WSABuf.len = sizeof(perIoData->inOutBuffer); 
        perIoData->flags = 0; 

        if(WSARecv(perIoData->socket, &(perIoData->WSABuf), 1, &bytesTransferred, &(perIoData->flags), &(perIoData->overlapped), NULL) == 0) 
        { 
         // Part message 
         continue; 
        } 

        if(WSAGetLastError() == WSA_IO_PENDING) 
        { 
         // End of message 
//TODO: Process message here 
         continue; 
        } 
       } 
      } 
      break; 

      case OPERATION_WRITE: 
      { 
       perIoData->bytesSent += bytesTransferred; 

       if(perIoData->bytesSent < perIoData->bytesToSend) 
       { 
        perIoData->WSABuf.buf = (char*)&(perIoData->inOutBuffer[perIoData->bytesSent]); 
        perIoData->WSABuf.len = (perIoData->bytesToSend - perIoData->bytesSent); 
       } 
       else 
       { 
        perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer; 
        perIoData->WSABuf.len = _tcslen(perIoData->inOutBuffer) * sizeof(TCHAR); 
        perIoData->bytesSent = 0; 
        perIoData->bytesToSend = perIoData->WSABuf.len; 
       } 

       if(perIoData->bytesToSend) 
       { 
        if(WSASend(perIoData->socket, &(perIoData->WSABuf), 1, &bytesTransferred, 0, &(perIoData->overlapped), NULL) == 0) 
         continue; 

        if(WSAGetLastError() == WSA_IO_PENDING) 
         continue; 
       } 
      } 
      break; 
     } 
    } 

    return 0; 
} 

bool SocketServer::read(SOCKET socket, HANDLE completionPort) 
{ 
    PER_IO_DATA* perIoData = new PER_IO_DATA; 
    ZeroMemory(perIoData, sizeof(PER_IO_DATA)); 

    perIoData->socket   = socket; 
    perIoData->operation   = OPERATION_READ; 
    perIoData->WSABuf.buf  = (char*)perIoData->inOutBuffer; 
    perIoData->WSABuf.len  = sizeof(perIoData->inOutBuffer); 
    perIoData->overlapped.hEvent = WSACreateEvent(); 

    DWORD bytesReceived = 0; 
    if(WSARecv(perIoData->socket, &(perIoData->WSABuf), 1, &bytesReceived, &(perIoData->flags), &(perIoData->overlapped), NULL) == SOCKET_ERROR) 
    { 
     int gle = WSAGetLastError(); 
     if(WSAGetLastError() != WSA_IO_PENDING) 
     { 
      delete perIoData; 
      return false; 
     } 
    } 

    return true; 
} 

bool SocketServer::write(SOCKET socket, std::tstring& data) 
{ 
    PER_IO_DATA* perIoData = new PER_IO_DATA; 
    ZeroMemory(perIoData, sizeof(PER_IO_DATA)); 

    perIoData->socket   = socket; 
    perIoData->operation   = OPERATION_WRITE; 
    perIoData->WSABuf.buf  = (char*)data.c_str(); 
    perIoData->WSABuf.len  = _tcslen(data.c_str()) * sizeof(TCHAR); 
    perIoData->bytesToSend  = perIoData->WSABuf.len; 
    perIoData->overlapped.hEvent = WSACreateEvent(); 

    DWORD bytesSent = 0; 
    if(WSASend(perIoData->socket, &(perIoData->WSABuf), 1, &bytesSent, 0, &(perIoData->overlapped), NULL) == SOCKET_ERROR) 
    { 
     if(WSAGetLastError() != WSA_IO_PENDING) 
     { 
      delete perIoData; 
      return false; 
     } 
    } 

    return true; 
} 

1)第一個問題我有與初始讀取。

在客戶端連接(接受)上,我發出讀取。由於客戶端尚未發送任何數據,WSAGetLastError()是WSA_IO_PENDING,並且讀取方法返回。

當客戶端發送數據時,線程仍然停留在GetQueuedCompletionStatus調用中(因爲我認爲我需要另一個WSARecv調用?)。

我應該繼續循環讀取方法,直到數據到達?這似乎並不合邏輯,我認爲通過發佈初始讀GetQueuedCompletionStatus將在數據到達時完成。

2)我需要雙向讀寫數據而無需確認。因此我也創建了一個IOCP線程的客戶端。實際上可以用完成端口來做到這一點,還是必須在寫入之後進行讀取?

對於感覺像基本問題那樣感到抱歉,但在拖網和構建IOCP示例之後,我仍然無法回答這些問題。

非常感謝提前。

回答

2

在客戶端連接(接受)上,我發出讀取。由於客戶端尚未發送任何數據,WSAGetLastError()是WSA_IO_PENDING,並且讀取方法返回。

這是正常行爲。

當客戶端發送數據時,線程仍然停留在GetQueuedCompletionStatus調用中(因爲我認爲我需要另一個WSARecv調用?)。

不,你不需要另一個電話。如果它卡住了,那麼你沒有正確地將讀取與I/O完成端口相關聯。

我應該繼續循環讀取方法,直到數據到達?

不需要。您需要一次撥打WSARecv()進行初始閱讀。 WSA_IO_PENDING錯誤表示讀數據正在等待數據,並在數據實際到達時向I/O完成端口發送信號。不要致電WSARecv()(或任何其他閱讀功能),直到該信號實際到達。然後您可以再次撥打WSARecv()以等待更多數據。重複,直到套接字斷開連接。

我認爲通過發佈初始讀取GetQueuedCompletionStatus可以在數據到達時完成。

這正是應該發生的事情。

2)我需要在沒有確認的情況下雙向讀寫數據。因此我也創建了一個IOCP線程的客戶端。實際上是否可以用完成端口來做到這一點

是的。閱讀和寫作是分開的操作,它們不相互依賴。

確實讀取後必須寫入?

如果您的協議不需要它,不需要。

現在,就是說,你的代碼有一些問題。

在一個小問題上,WSAAccept()是同步的,您應該考慮使用AcceptEx()來代替,因此它可以使用相同的I/O完成端口來報告新的連接。

但更重要的是,當掛起的I/O操作失敗,GetQueuedCompletionStatus()返回FALSE,返回LPOVERLAPPED指針將非NULL,以及GetLastError()將報告爲什麼I/O操作失敗。但是,如果GetQueuedCompletionStatus()本身失敗,則返回的LPOVERLAPPED指針將爲空,並且GetLastError()將報告爲什麼GetQueuedCompletionStatus()失敗。這種差異在documentation中有明確說明,但您的while循環未考慮它。使用do..while循環,而不是和行爲根據LPOVERLAPPED指針:

DWORD WINAPI completionPortThreadProc(LPVOID param) 
{ 
    DWORD bytesTransferred = 0; 
    ULONG_PTR completionKey = NULL; 
    LPPER_IO_DATA perIoData = NULL; 

    do 
    { 
     if(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE)) 
     { 
      // I/O success, handle perIoData based on completionKey as needed... 
     } 
     else if(perIoData) 
     { 
      // I/O failed, handle perIoData based on completionKey as needed... 
     } 
     else 
     { 
      // GetQueuedCompletionStatus() failure... 
      break; 
     }  
    } 
    while(WaitForSingleObject(exitEvent, 0) == WAIT_TIMEOUT); 

    return 0; 
} 

在一個側面說明,而不是使用一個事件對象發出信號時completionPortThreadProc()應該退出,可以考慮使用PostQueuedCompletionionStatus()而不是終止completionKey郵寄到我/ O完成端口,那麼你的循環可以查找值:

DWORD WINAPI completionPortThreadProc(LPVOID param) 
{ 
    DWORD bytesTransferred = 0; 
    ULONG_PTR completionKey = NULL; 
    LPPER_IO_DATA perIoData = NULL; 

    do 
    { 
     if(GetQueuedCompletionStatus(completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE)) 
     { 
      if(completionKey == MyTerminateKey) 
       break; 

      if(completionKey == MySocketIOKey) 
      { 
       // I/O success, handle perIoData as needed... 
      } 
     } 
     else if(perIoData) 
     { 
      // I/O failed, handle perIoData based on completionKey as needed... 
     } 
     else 
     { 
      // GetQueuedCompletionStatus() failure... 
      break; 
     }  
    } 
    while(true); 

    return 0; 
} 

CreateIoCompletionPort((HANDLE)acceptSocket, completionPort, MySocketIOKey, 0); 

PostQueuedCompletionStatus(completionPort, 0, MyTerminateKey, NULL); 
+0

嗨雷米, 你長的解釋非常感謝,這是非常大加讚賞,我要瘋了這裏。我會通過您的意見並回報!我從其他例子中取得了相當數量的代碼。 – CAM79

+0

嗨,雷米,好吧,新循環是關鍵,並專門檢查'if(perIoData)'中的GetLastError。 I/O由於WSA_OPERATION_ABORTED失敗而未能完成。我有一個接受線程發佈了閱讀,然後結束了。我原以爲它仍然可以工作,但顯然我的設計是錯誤的。 非常感謝您的幫助,我會在您的新設計中使用您的想法。 希望您的意見也可以幫助其他人,因爲很多示例似乎都使用我的代碼。 – CAM79

+0

當一個線程終止時,它所啓動的任何I/O操作都將自動中止。你應該在一個線程內的一個循環中調用'WSAAccept()',該線程至少在監聽套接字的生命週期內存在(或者將客戶端接受移動到IO完成端口併發出來自這樣的線程的初始接受) Os不會中止.. –