2012-11-27 133 views
5

我想創建一個專用線程,專門用於使用boost庫(asio)從UDP套接字接收數據。這個線程應該是由從UDP套接字接收到的一些數據觸發的無限循環。在我的應用程序中,我需要使用異步接收操作。boost asio udp套接字async_receive_from不會調用處理程序

如果我使用同步函數receive_from,一切都按預期工作。

但是,如果我使用async_receive_from處理程序永遠不會被調用。由於我使用信號量來檢測是否收到了一些數據,因此程序會鎖定並且從不觸發循環。

我已驗證(使用網絡分析器)發件人設備在UDP套接字上正確發送數據。

我在下面的代碼中隔離了這個問題。

#include <boost\array.hpp> 
#include <boost\asio.hpp> 
#include <boost\thread.hpp> 
#include <boost\interprocess\sync\interprocess_semaphore.hpp> 

#include <iostream> 

typedef boost::interprocess::interprocess_semaphore Semaphore; 

using namespace boost::asio::ip; 

class ReceiveUDP 
{ 
public: 

    boost::thread* m_pThread; 

    boost::asio::io_service   m_io_service; 
    udp::endpoint     m_local_endpoint; 
    udp::endpoint     m_sender_endpoint; 

    udp::socket      m_socket; 

    size_t  m_read_bytes; 
    Semaphore m_receive_semaphore; 

    ReceiveUDP() : 
     m_socket(m_io_service), 
     m_local_endpoint(boost::asio::ip::address::from_string("192.168.0.254"), 11), 
     m_sender_endpoint(boost::asio::ip::address::from_string("192.168.0.11"), 5550), 
     m_receive_semaphore(0) 
    { 
     Start(); 
    } 

    void Start() 
    { 
     m_pThread = new boost::thread(&ReceiveUDP::_ThreadFunction, this); 
    } 

    void _HandleReceiveFrom(
     const boost::system::error_code& error, 
     size_t         received_bytes) 
    { 
     m_receive_semaphore.post(); 

     m_read_bytes = received_bytes; 
    } 

    void _ThreadFunction() 
    { 
     try 
     { 
      boost::array<char, 100> recv_buf; 

      m_socket.open(udp::v4()); 
      m_socket.bind(m_local_endpoint); 
      m_io_service.run(); 

      while (1) 
      { 
#if 1 // THIS WORKS 

       m_read_bytes = m_socket.receive_from(
        boost::asio::buffer(recv_buf), m_sender_endpoint); 

#else // THIS DOESN'T WORK 

       m_socket.async_receive_from(
        boost::asio::buffer(recv_buf), 
        m_sender_endpoint, 
        boost::bind(&ReceiveUDP::_HandleReceiveFrom, this, 
        boost::asio::placeholders::error, 
        boost::asio::placeholders::bytes_transferred)); 

       /* The program locks on this wait since _HandleReceiveFrom 
       is never called. */ 
       m_receive_semaphore.wait(); 

#endif 

       std::cout.write(recv_buf.data(), m_read_bytes); 
      } 

      m_socket.close(); 
     } 
     catch (std::exception& e) 
     { 
      std::cerr << e.what() << std::endl; 
     } 
    } 
}; 

void main() 
{ 
    ReceiveUDP receive_thread; 

    receive_thread.m_pThread->join(); 
} 

在旗語甲TIMED_WAIT是優選,然而用於調試目的我使用了一個阻塞等待如在上面的代碼。

我錯過了什麼嗎?我的錯誤在哪裏?

回答

7

您撥打io_service.run()正在退出,因爲io_service沒有工作要做。代碼然後進入while循環並呼叫m_socket.async_receive_from。此時io_service未運行,它從不讀取數據並調用處理程序。

你需要安排工作調用io_service對象運行前的事:

即:

// Configure io service 
ReceiveUDP receiver; 

m_socket.open(udp::v4()); 
m_socket.bind(m_local_endpoint); 
m_socket.async_receive_from(
    boost::asio::buffer(recv_buf), 
    m_sender_endpoint, 
    boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver, 
    boost::asio::placeholders::error, 
    boost::asio::placeholders::bytes_transferred)); 

處理函數將執行以下操作:

// start the io service 
void HandleReceiveFrom(
    const boost::system::error_code& error, 
    size_t received_bytes) 
{ 
    m_receive_semaphore.post(); 

    // schedule the next asynchronous read 
    m_socket.async_receive_from(
     boost::asio::buffer(recv_buf), 
     m_sender_endpoint, 
     boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver, 
     boost::asio::placeholders::error, 
     boost::asio::placeholders::bytes_transferred)); 

    m_read_bytes = received_bytes; 
} 

你的線程則只需等待對於信號量:

while (1) 
{ 
    m_receive_semaphore.wait(); 
    std::cout.write(recv_buf.data(), m_read_bytes); 
} 

備註:

  1. 你真的需要這個額外的線程嗎?處理程序是完全異步的,boost :: asio可用於管理線程池(請參閱:think-async
  2. 請不要使用下劃線,後跟大寫字母表示變量/函數名稱。他們保留。
+0

非常感謝!我根據你的建議修改了代碼,一切正常。 我已經在線程創建之前配置了IO服務。到io_service.run()的調用只是線程創建後: \t無效的start() \t { \t \t m_socket.open(UDP :: V4()); \t \t m_socket.bind(m_local_endpoint); \t \t StartRead(); \t \t m_pThread = new boost :: thread(&ReceiveUDP :: _ ThreadFunction,this); \t \t m_io_service.run(); \t} 其中StartRead()是對async_receive_from的調用。 再次感謝。 – arms

+0

謝謝你,我瘋了。 – Alex

0

m_io_service.run()立即返回,所以沒人發送完成處理程序。請注意,io_service::run是一種基於asio的應用程序的「消息循環」,只要您希望asio功能可用,它就應該運行(這是一個簡單的描述,但對於您的情況已經足夠了)。

此外,您不應該在循環中調用async.operation。相反,在前一個完成處理程序中發出後續的async.operation,以確保2個async.reads不會同時運行。

請參閱asio示例以查看典型的asio應用程序設計。

相關問題