2017-05-07 60 views
0

我正在研究一個簡單的TCP服務器,它讀取並將消息寫入線程安全隊列。然後應用程序可以使用這些隊列安全地讀取和寫入套接字,即使是從不同的線程。加速asio異步讀取和使用隊列寫入套接字

我面臨的問題是我不能async_read。我的隊列有pop操作,它返回下一個要處理的元素,但是如果沒有可用的元素則會阻止它。所以一旦我呼叫POP async_read回調當然不會被解僱了。有沒有一種方法可以將這樣的隊列整合到boost asio中,還是必須完全重寫?

下面是我用來展示我遇到的問題的一個簡短示例。一旦建立了TCP連接,我將創建一個新的線程,該線程將在該tcp_connection下運行該應用程序。之後我想開始async_readasync_write。我一直在這裏打破了我的頭幾個小時,我真的不知道如何解決這個問題。

class tcp_connection : public std::enable_shared_from_this<tcp_connection> 
{ 
public: 
    static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) { 
     return std::shared_ptr<tcp_connection>(new tcp_connection(io_service)); 
    } 

    boost::asio::ip::tcp::socket& get_socket() 
    { 
     return this->socket; 
    } 

    void app_start() 
    { 
     while(1) 
     { 
      // Pop is a blocking call. 
      auto inbound_message = this->inbound_messages.pop(); 
      std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl; 
      this->outbound_messages.push(inbound_message); 
     } 
    } 

    void start() { 
     this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this()); 

     boost::asio::async_read_until(this->socket, this->input_stream, "\r\n", 
      strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 

     // Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks 
     // empty() is also available to check whether the queue is empty. 
     // So how can I async write without blocking the read. 
     // block... 
     auto message = this->outbound_messages.pop(); 
     boost::asio::async_write(this->socket, boost::asio::buffer(message), 
      strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 

    void handle_read(const boost::system::error_code& e, size_t bytes_read) 
    { 
     std::cout << "handle_read called" << std::endl; 
     if (e) 
     { 
      std::cout << "Error handle_read: " << e.message() << std::endl; 
      return; 
     } 
     if (bytes_read != 0) 
     { 
      std::istream istream(&this->input_stream); 
      std::string message; 
      message.resize(bytes_read); 
      istream.read(&message[0], bytes_read); 
      std::cout << "Got message: " << message << std::endl; 
      this->inbound_messages.push(message); 
     } 
     boost::asio::async_read_until(this->socket, this->input_stream, "\r\n", 
      strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 

    void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/) 
    { 
     if (e) 
     { 
      std::cout << "Error handle_write: " << e.message() << std::endl; 
      return; 
     } 

     // block... 
     auto message = this->outbound_messages.pop(); 
     boost::asio::async_write(this->socket, boost::asio::buffer(message), 
      strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 



private: 
    tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service) 
    { 
    } 

    boost::asio::ip::tcp::socket socket; 
    boost::asio::strand strand; 
    boost::asio::streambuf input_stream; 

    std::thread app_thread; 

    concurrent_queue<std::string> inbound_messages; 
    concurrent_queue<std::string> outbound_messages; 
}; 

class tcp_server 
{ 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001)) 
    { 
     start_accept(); 
    } 

private: 
    void start_accept() 
    { 
     std::shared_ptr<tcp_connection> new_connection = 
      tcp_connection::create(acceptor.get_io_service()); 

     acceptor.async_accept(new_connection->get_socket(), 
      boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error)); 
    } 

    void handle_accept(std::shared_ptr<tcp_connection> new_connection, 
         const boost::system::error_code& error) 
    { 
     if (!error) 
     { 
      new_connection->start(); 
     } 

     start_accept(); 
    } 

    boost::asio::ip::tcp::acceptor acceptor; 
}; 

回答

2

,如果你想要一個async_pop方法,這需要一個錯誤消息佔位符,並且回調處理程序在我看來。當您收到消息時,請檢查是否有未完成的處理程序,如果有,請彈出消息,取消註冊處理程序並調用它。同樣,當註冊async_pop時,如果已經有消息等待,請彈出消息併發送調用處理程序而不註冊它。

您可能想從pop_operation或類似的多態基本庫中派生async_pop類。

+0

謝謝!我沒有想到我可以自己創建一個像這樣的處理程序。就像我使用'strand.wrap'這樣的lang應該可以正常工作。我有一個問題,但爲什麼我需要錯誤消息佔位符? –

+0

@JohnSmith我的錯誤。你需要一個消息佔位符(std :: string&?),你希望將來自上一次不成功的io讀操作的任何錯誤代碼傳遞迴異步處理程序,以防消息隊列耗盡並且讀取錯誤。 –

+0

是的,我現在已經實施了。在代碼中,它不是一個字符串,但用示例字符串演示是最簡單的。 Tommorow我會在另一個答案中發佈代碼,但我會保持你的接受。只爲那些想知道同樣事情的人。 –