2013-02-14 171 views
0

我使用boost 1.52.0 32位庫與OpenSSL 32位庫與非託管Visual C++ 2008爲新客戶端I正在寫信與現有的服務器進行通信。我的測試機器使用Windows 8.我正在使用同步讀取和寫入。該代碼內置於可從C#訪問的DLL中,但所有的asio調用都是在使用boost :: thread_group創建的非託管線程上完成的。boost :: asio :: write似乎不工作,而boost :: asio :: read是優秀的

我發現的是,當一個同步讀取正在等待數據時,在另一個線程中發生的同步寫入看起來被阻塞,並且不會出去 - 至少通過我編碼的方式。所以我的問題是 - 同步讀取是否需要等待另一個線程中的數據才能完成同步寫入?

我已經驗證過,如果在另一個線程中沒有掛起的讀取操作,我可以成功寫入數據。我通過在讀取之前凍結讀取的線程來做到這一點。編寫線程然後寫出一條消息。然後我解凍了讀取的線程,並且它能夠成功地從服務器讀取關於發送的消息的響應。

以下方法由create_thread方法調用來處理從服務器讀取消息脫絲:

void SSLSocket::ProcessServerRequests() 
{ 
    // This method is responsible for processing requests from a server. 
    Byte *pByte; 
    int ByteCount; 
    size_t BytesTransferred; 
    boost::system::error_code Err; 
    Byte* pReqBuf; 
    string s; 
    stringstream ss; 
    // 
    try 
    { 
     ss << "ProcessServerRequests: Worker thread: " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Enable the handlers for the handshaking. 
     IOService->run(); 
     // Wait for the handshake to be sucessfully completed. 
     do 
     { 
     Sleep(50); 
     } while (!HandShakeReady); 
     // 
     sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string(); 
     uiClientPort = pSocket->lowest_layer().remote_endpoint().port(); 
     ReqAlive = true; 
     // If the thread that handles sending msgs to all servers has not been created yet, then create that one. 
     // This thread is created just once to handle all outbound msgs to all servers. 
     WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread)); 
     // Loop until the user quits, or an error is detected. The read method should wait until there is something to read. 
     do 
     { 
     pReqBuf = BufMang.GetPtr(MsgLenBytes); 
     boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::asio::transfer_exactly(MsgLenBytes), Err); 
     if (Err) 
     { 
      s = Err.message(); 
      if ((s.find("short r")) == string::npos) 
      { 
       ss.str(""); 
       ss << "SSLSocket::ProcessServerRequests: read(1) error = " << Err.message() << "\n. Terminating.\n\n"; 
       Log.LogString(ss.str(), LogError); 
      } 
      Terminate(); 
      // Notify the client that an error has been encountered and the program needs to shut down. TBD. 
     } 
     else 
     { 
      // Get the number of bytes in the message. 
      pByte = pReqBuf; 
      B2I.B.B1 = *pByte++; 
      B2I.B.B2 = *pByte++; 
      B2I.B.B3 = *pByte++; 
      B2I.B.B4 = *pByte; 
      ByteCount = B2I.IntVal; 
      pReqBuf = BufMang.GetPtr(ByteCount); 
      // Do a synchronous read which will hang until the entire message is read off the wire. 
      BytesTransferred = boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, ByteCount), boost::asio::transfer_exactly(ByteCount), Err); 
      ss.str(""); 
      ss << "SSLSocket::ProcessServerRequests: # bytes rcvd = " << Logger::NumberToString(BytesTransferred).c_str() << " from "; 
      ss << sClientIp.c_str() << " : " << Logger::NumberToString(uiClientPort) << "\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pReqBuf, (int)BytesTransferred, DisplayInHex, LogDebug3); 
      if ((Err) || (ByteCount != BytesTransferred)) 
      { 
       if (Err) 
       { 
        ss.str(""); 
        ss << "ProcessServerRequests:read(2) error = " << Err.message() << "\n. Terminating.\n\n"; 
       } 
       else 
       { 
        ss.str(""); 
        ss << "ProcessServerRequests:read(3) error - BytesTransferred (" << Logger::NumberToString(BytesTransferred).c_str() << 
        ") != ByteCount (" << Logger::NumberToString(ByteCount).c_str() << "). Terminating.\n\n"; 
       } 
       Log.LogString(ss.str(), LogError); 
       Terminate(); 
       // Notify the client that an error has been encountered and the program needs to shut down. TBD. 
       break; 
      } 
      // Call the C# callback method that will handle the message. 
      Log.LogString("SSLSocket::ProcessServerRequests: sending msg to the C# client.\n\n", LogDebug2); 
      CallbackFunction(this, BytesTransferred, (void*)pReqBuf); 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::ProcessServerRequests: worker thread done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::ProcessServerRequests: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
    } 
} 

以下方法由create_thread方法調用來處理將消息發送到服務器:

void SSLSocket::SendWorkerThread() 
{ 
    // This method handles sending msgs to the server. It is called upon 1st time class initialization. 
    // 
    DWORD WaitResult; 
    Log.LogString("SSLSocket::SendWorkerThread: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo); 
    // Loop until the user quits, or an error of some sort is thrown. 
    try 
    { 
     do 
     { 
     // If there are one or more msgs that need to be sent to a server, then send them out. 
     if (SendMsgQ.Count() > 0) 
     { 
      Message* pMsg = SendMsgQ.Pop(); 
      // Byte* pBuf = pMsg->pBuf; 
      const Byte* pBuf = pMsg->pBuf; 
      SSLSocket* pSSL = pMsg->pSSL; 
      int BytesInMsg = pMsg->BytesInMsg; 
      boost::system::error_code Error; 
      unsigned int BytesTransferred = boost::asio::write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), Error); 
      string s = "SSLSocket::SendWorkerThread: # bytes sent = "; 
      s += Logger::NumberToString(BytesInMsg).c_str(); 
      s += "\n"; 
      Log.LogString(s, LogDebug2); 
      Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3); 
      if (Error) 
      { 
       Log.LogString("SSLSocket::SendWorkerThread: error sending message - " + Error.message() + "\n", LogError); 
      } 
     } 
     else 
     { 
      // Nothing to send, so go into a wait state. 
      WaitResult = WaitForSingleObject(hEvent, INFINITE); 
      if (WaitResult != 0L) 
      { 
       Log.LogString("SSLSocket::SendWorkerThread: WaitForSingleObject event error. Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError); 
      } 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::SendWorkerThread: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::SendWorkerThread: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
    } 
} 

因此,如果同步寫入應該能夠在另一個線程中同步讀取掛起時執行,那麼有人可以告訴我我的代碼做錯了什麼。

+0

我剛剛發現問題的原因。在將調用從同步轉換爲異步之後,它仍然表現出同樣的問題。事實證明,這是由於一個畸形的消息。服務器根本不會對格式錯誤的消息做出響應,我剛發現這些消息。所以,我猜測同步調用可能沒問題,但現在我暫時將它保持異步。 – 2013-02-20 00:26:25

回答

1

Asio套接字不是線程安全的,因此您可能無法從不同的線程訪問它。代替使用 使用async_readasync_write