2015-06-26 28 views
1

在boost :: asio :: async_read_until之後,我遇到了處理程序調用的週期性無限等待。這似乎在兩種情況下:asio_read_until的無限執行async_read_until

  • 在服務器上,但客戶端的boost ::支持ASIO :: ASYNC_WRITE處理程序 叫沒有任何錯誤。此行爲僅在客戶端 在服務器上連接後發送其第一個命令時出現。
  • 在客戶端,經過幾分鐘的命令交換後,在這種情況下, 服務器端調用boost :: asio :: async_write處理程序,但客戶端 繼續等待boost :: asio :: async_read_until處理程序。

鑑於沒有每一次,我認爲我錯誤地使用async_read_until,async_write和boost :: asio :: strand。

這裏是簡化服務器的模式:

  • 閱讀插座下鏈
  • 接受新的命令後,另一個異步讀取開始,接收到的命令被另一個線程異步處理(且無鏈)
  • 處理命令後,結果被髮回到同一個套接字使用async_write在股

(一些命令到服務器不要求紅色響應,有時需要在服務器發生某些事件後向客戶端發送命令,即不從客戶端獲取數據。這是值得澄清,同一插座上不能同時運行一個以上的async_read_until運行多個ASYNC_WRITE操作)

和客戶端的一個:

  • 連接發送命令到服務器之後
  • 從服務器接收指令後,傳送一個答案

服務器的代碼:

#include <iostream> 
#include <istream> 
#include <ostream> 
#include <string> 

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 

class server_session 
    : public boost::enable_shared_from_this<server_session> 
{ 
public: 
    server_session(boost::asio::io_service& io_service) 
     : _io_service(io_service) 
     , _socket(io_service) 
     , _strand(io_service) 
     , _delimiter('\b') 
    {} 

    boost::asio::ip::tcp::socket& socket() 
    { 
     return _socket; 
    } 

    void read() 
    { 
     boost::asio::async_read_until(
      _socket, 
      _streambuf, 
      _delimiter, 
      _strand.wrap(
       boost::bind(
        &server_session::handle_read, 
        shared_from_this(), 
        boost::asio::placeholders::error, 
        boost::asio::placeholders::bytes_transferred 
        ) 
       ) 
      ); 
    } 

    void handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred) 
    { 
     if (ec) 
     { 
      std::cerr << "async_read_until error: " << ec.message() << std::endl; 
      return; 
     } 

     std::istream is(&_streambuf); 
     std::string msg; 
     std::getline(is, msg, _delimiter); 

     read(); 

     _io_service.post(
      boost::bind(
       &server_session::proc_msg, 
       shared_from_this(), 
       msg 
       ) 
      ); 
    } 

    void proc_msg(const std::string msg) 
    { 
     // command proc here 
     std::cout << "received: " << msg << std::endl; 

     static std::uint64_t i = 0; 
     write("resp_" + std::to_string(i++)); 
    } 

    void write(const std::string& msg) 
    { 
     _strand.post(
      boost::bind(
       &server_session::write_impl, 
       shared_from_this(), 
       boost::shared_ptr<std::string>(new std::string(msg + _delimiter)) 
       ) 
      ); 
    } 

private: 
    void write_impl(const boost::shared_ptr<std::string>& text_ptr) 
    { 
     boost::asio::async_write(
      _socket, 
      boost::asio::buffer(text_ptr->data(), text_ptr->size()), 
      _strand.wrap(
        boost::bind(
         &server_session::handle_write, 
         shared_from_this(), 
         text_ptr, 
         boost::asio::placeholders::error 
         ) 
        ) 
      ); 
    } 

    void handle_write(const boost::shared_ptr<std::string>, const boost::system::error_code& ec) 
    { 
     if (ec) 
     {    
      std::cerr << "async_write error: " << ec.message() << std::endl; 
     } 
    } 

    boost::asio::io_service& _io_service; 
    boost::asio::ip::tcp::socket _socket; 
    boost::asio::strand _strand; 
    boost::asio::streambuf _streambuf; 
    char _delimiter; 
}; 

class server 
{ 
public: 
    server(int port) 
     : _acceptor(
      _io_service, 
      boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),port) 
      ) 
    { 
     start_accept(); 

     boost::thread_group thd_grp; 

     for (int i = 0; i < 2; ++i) 
     { 
      thd_grp.create_thread(
       boost::bind(&boost::asio::io_service::run, &_io_service) 
       ); 
     } 

     thd_grp.join_all(); 
    } 

private: 
    void start_accept() 
    { 
     _session_ptr.reset(new server_session(_io_service)); 

     _acceptor.async_accept(
      _session_ptr->socket(), 
      boost::bind(
       &server::handle_accept, 
       this, 
       boost::asio::placeholders::error 
       ) 
      ); 
    } 

    void server::handle_accept(const boost::system::error_code& ec) 
    { 
     if (ec) 
     { 
      std::cerr << "handle_accept error: " << ec.message() << std::endl; 
      return; 
     } 

     _session_ptr->read(); 
     start_accept(); 
    } 

    boost::asio::io_service _io_service; 
    boost::asio::ip::tcp::acceptor _acceptor; 
    boost::shared_ptr<server_session> _session_ptr; 
}; 

int main(int argc, char** argv) 
{ 
    if (argc != 2) 
    { 
     std::cerr << "usage: " << argv[0] << " <port>"; 
     return EXIT_FAILURE; 
    } 

    server s(std::stoi(argv[1])); 
    return EXIT_SUCCESS; 
} 

客戶端的代碼:

#include <iostream> 
#include <istream> 
#include <ostream> 
#include <string> 

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/shared_ptr.hpp> 

class client 
{ 
public: 
    client(const std::string& host, const std::string& port) 
     : _socket(_io_service) 
     , _resolver(_io_service) 
     , _query(host, port) 
     , _delimiter('\b') 
    { 
     _iterator = _resolver.resolve(_query); 

     boost::asio::async_connect(
      _socket, 
      _iterator, 
      boost::bind(
       &client::handle_connect, 
       this, 
       boost::asio::placeholders::error 
       ) 
      ); 

     _io_service.run(); 
    } 

    void handle_connect(const boost::system::error_code& ec) 
    { 
     if (ec) 
     { 
      std::cerr << "async_connect error: " << ec.message() << std::endl; 
      return; 
     } 

     write(); 
    } 

    void write() 
    { 
     static std::uint64_t i = 0; 
     boost::shared_ptr<std::string> msg_ptr(new std::string("req_" + std::to_string(i++) + _delimiter)); 

     boost::asio::async_write(
      _socket, 
      boost::asio::buffer(msg_ptr->data(), msg_ptr->size()), 
      boost::bind(
       &client::handle_write, 
       this, 
       msg_ptr, 
       boost::asio::placeholders::error 
       ) 
      ); 
    } 

    void handle_write(const boost::shared_ptr<std::string>, const boost::system::error_code& ec) 
    { 
     if (ec) 
     { 
      std::cerr << "async_write error: " << ec.message() << std::endl; 
      return; 
     } 

     read(); 
    } 

    void read() 
    { 
     boost::asio::async_read_until(
      _socket, 
      _streambuf, 
      _delimiter, 
      boost::bind(
       &client::handle_read, 
       this, 
       boost::asio::placeholders::error, 
       boost::asio::placeholders::bytes_transferred 
       ) 
      ); 
    } 

    void handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred) 
    { 
     if (ec) 
     {    
      std::cerr << "async_read_until error: " << ec.message() << std::endl; 
      return; 
     } 

     std::istream is(&_streambuf); 
     std::string msg; 
     std::getline(is, msg, _delimiter); 

     std::cout << "received: " << msg << std::endl; 

     write(); 
    } 

    boost::asio::io_service _io_service;  
    boost::asio::ip::tcp::socket _socket; 
    boost::asio::ip::tcp::resolver _resolver; 
    boost::asio::ip::tcp::resolver::query _query; 
    boost::asio::ip::tcp::resolver::iterator _iterator; 
    boost::asio::streambuf _streambuf; 
    char _delimiter; 
}; 

int main(int argc, char** argv) 
{ 
    if (argc != 3) 
    { 
     std::cerr << "usage: " << argv[0] << " <host> <port>"; 
     return EXIT_FAILURE; 
    } 

    client c(argv[1], argv[2]); 
    return EXIT_SUCCESS; 
} 

它是正確的,而在插座上async_read_until已經運行到ASYNC_WRITE,反之使用鏈?

是否有可能,async_read_until以某種方式內部阻塞套接字,實際上,數據不會發送到客戶端?

我應當承擔任何建議感激爲什麼代碼可能無法正常工作

我使用升壓1.58,平臺 - Win7和Win8的。以上結果對本地主機和局域網進行了測試。

回答

0

我可以告訴你,至少你應該使用asio :: deadline_timer,並在每次開始新的異步操作時重置計時器。您應該特別使用async_read_until這樣的操作來執行此操作,因爲您需要指定可能永遠不會滿足的調用完成條件。查找deadline_timer示例,並對每個異步操作執行一次async_wait,指定該操作的最後期限。在超時操作中,您可以取消掛起的異步操作,錯誤輸出或重新啓動,只要適當。

+0

據我所知,如果我將使用socket :: cancel一些已經收到,但不讀取數據可能會丟失。可能是如果我改變boost :: asio :: async_read_until與幾個async_read調用問題消失? – gukobanetr

+0

@gukobanetr是的,調用取消將導致io失敗並且不能正確完成。我只會使用async_read_until,如果你絕對必須(比如閱讀HTTP Headers),因爲有很多問題伴隨着這個問題,比如條件不被滿足並且有永恆的等待,streambuf對象的怪異(不正確的使用和缺乏強制準備緩衝區序列會導致性能下降)等等,如果你不必這樣做,你就不需要採取很多的措施。 –

+0

我是否可以避免使用例如async_read_until套接字時可能產生的副作用basic_stream_socket :: async_receive方法? (可能每個async_receive調用讀取一個字節並將其附加到我的緩衝區) – gukobanetr