2013-10-02 24 views
0

我非常喜歡boost :: asio,所以請幫助我。boost :: asio如何從客戶端異步讀取數據並定期寫入數據(如果有的話)

我需要編寫單線程TCP服務器。服務器應該接受客戶端連接並不斷從客戶端套接字讀取輸入數據。定期服務器應該向客戶端發送數據。所以我有一些這樣的問題 - 所有的例子說明情況時,我們總是有循環

  1. async_receive()
  2. on_receive() - > ASYNC_WRITE()
  3. on_write() - >轉到1 :)

所以我的決定是使用計時器來檢查要發送到套接字的數據。

我寫了測試服務器,並有非常奇怪的行爲 - 如果客戶端連接,做一些事情,並與一些時間三角洲一個接一個斷開連接工作正常。但是,如果所有的客戶端同時斷開連接,我有 情況定時器處理程序嘗試使用DESTROYED對象(鎖定關鍵部分)的成員類。

我無法形容爲什麼!請幫忙 !

[這部影片展示它是如何轉載](http://www.youtube.com/watch?v=NMWkD7rqf7Y&feature=youtu.be 「1080」)

謝謝!

#include <boost/none.hpp> 
#include <boost/bind.hpp> 
#include <boost/asio.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/enable_shared_from_this.hpp> 

#include <iostream> 




using namespace boost::asio; 
using namespace boost::posix_time; 



class CIncommingConnection ; 
typedef boost::shared_ptr<CIncommingConnection> CIncommingConnectionPtr; 


struct IIncomingServer 
{ 
    virtual void OnData(CIncommingConnectionPtr pConn, const char *pData, size_t bytes) = 0; 
    virtual void OnConnected(CIncommingConnectionPtr pConn) = 0; 
    virtual void OnDisconnected(const boost::system::error_code& err, CIncommingConnectionPtr pConn) = 0; 
}; 



class CAutoLock 
{ 
public: 
    CAutoLock(CRITICAL_SECTION &cs) : 
     m_cs(cs) 
    { 
     ::EnterCriticalSection(&m_cs); 
    } 

    ~CAutoLock() 
    { 
     ::LeaveCriticalSection(&m_cs); 
    } 

private: 
    CRITICAL_SECTION &m_cs; 
}; 

class CIncommingConnection : public boost::enable_shared_from_this<CIncommingConnection>      
          ,boost::noncopyable 
{ 
public: 

    CIncommingConnection(const std::string sPeerName, boost::asio::io_service &service, IIncomingServer *pServer) : 
    m_service(service) 
    ,sock_(service) 
    ,m_sPeerName(sPeerName) 
    ,m_pServer(pServer) 
    ,m_timer(service) 
    { 
     ::InitializeCriticalSection(&m_cs); 

     std::cout << "CIncommingConnection()" << std::endl ; 
    } 


    ~CIncommingConnection() 
    { 
     std::cout << "CIncommingConnection()~" << std::endl ; 
     ::DeleteCriticalSection(&m_cs); 
    } 


    ip::tcp::socket & sock() 
    { 
     return sock_; 
    } 



    void start() 
    { 
     m_pServer->OnConnected(shared_from_this()); 
     do_read(); 
     wait_for_outgoingdata(); 
    } 


private: 

    void stop() 
    {  
     sock_.close(); 
     m_timer.cancel(); 
    } 



    void do_read() 
    { 
     sock_.async_receive(buffer(read_buffer_), boost::bind(&CIncommingConnection::handler_read, this, _1, _2)); 
    } 



    void do_error(const boost::system::error_code& error) 
    { 
     CIncommingConnectionPtr pConn = shared_from_this(); 

     stop() ; 

     m_pServer->OnDisconnected(error, pConn); 
    } 



    void handler_read(const boost::system::error_code& error, std::size_t bytes) 
    { 
     if (error) 
     { 
      do_error(error); 
      return ; 
     } 

     CIncommingConnectionPtr pConn = shared_from_this() ; 

     m_pServer->OnData(pConn, read_buffer_, bytes); 

     do_read(); 
    } 



    void wait_for_outgoingdata() 
    { 
     m_timer.expires_from_now(boost::posix_time::millisec(100)); 
     m_timer.async_wait(boost::bind(&CIncommingConnection::on_output_queue_timer, this, _1)); 
    } 



    void on_output_queue_timer(const boost::system::error_code& error) 
    { 
     if (error == boost::asio::error::operation_aborted) 
     { 
      return ; 
     } 

     CAutoLock oLock(m_cs); 

     if (!m_sOutBuf.empty()) 
      sock_.async_send(buffer(m_sOutBuf), boost::bind(&CIncommingConnection::handler_write, this, _1, _2)); 
     else 
      wait_for_outgoingdata(); 
    } 


    void handler_write(const boost::system::error_code& error, std::size_t bytes) 
    {  
     if (error) 
      return ; 


     if (bytes) 
     { 
      m_sOutBuf = m_sOutBuf.substr(bytes, m_sOutBuf.length()-bytes); 
     } 

     wait_for_outgoingdata(); 
    } 



private: 
    ip::tcp::socket sock_; 

    enum { max_msg = 1024 }; 
    char read_buffer_[max_msg]; 
    char write_buffer_[max_msg]; 


    boost::asio::io_service  &m_service ; 
    std::string      m_sPeerName ; 
    std::string      m_sOutBuf; 
    CRITICAL_SECTION    m_cs ; 
    IIncomingServer    *m_pServer; 
    boost::asio::deadline_timer  m_timer; 
}; 






class CIncomingServer : public boost::enable_shared_from_this<CIncomingServer>      
         , public IIncomingServer 
         , boost::noncopyable 
{ 

public: 

    CIncomingServer(boost::asio::io_service &service, 
     unsigned int port, 
     bool bAllowManyConnections, 
     const std::string sPeerName) : 

     m_acceptor (service, ip::tcp::endpoint(ip::tcp::v4(), port), false) 
    ,m_sPeerName(sPeerName) 
    ,m_port(port) 
    ,m_service(service) 
    ,m_timer(service) 
    ,m_bAllowManyConnections(bAllowManyConnections) 
    { 
    } 



    ~CIncomingServer() 
    { 
    } 



    void run() 
    { 
     CIncommingConnectionPtr pConn (new CIncommingConnection(m_sPeerName, m_service, this)); 
     m_clients.push_back(pConn); 


     m_acceptor.async_accept(pConn->sock(), boost::bind(&CIncomingServer::handle_accept, this, _1)); 

     m_timer.expires_from_now(boost::posix_time::millisec(500)); 
     m_timer.async_wait(boost::bind(&CIncomingServer::on_timer, this)); 
    } 




private: 

    void handle_accept(const boost::system::error_code & err) 
    { 
     m_clients.back()->start(); 

     CIncommingConnectionPtr pConnNew (new CIncommingConnection(m_sPeerName, m_service, this)); 
     m_clients.push_back(pConnNew); 

     m_acceptor.async_accept(pConnNew->sock(), boost::bind(&CIncomingServer::handle_accept, this, _1)); 
    } 


    //IIncomingServer 
    virtual void OnData(CIncommingConnectionPtr pConn, const char *pData, size_t bytes) 
    { 
     std::cout << "Data received" << std::endl ; 
    } 


    virtual void OnConnected(CIncommingConnectionPtr pConn) 
    { 
     std::cout << "Client connected" << std::endl ; 
    } 


    virtual void OnDisconnected(const boost::system::error_code& err, CIncommingConnectionPtr pConn) 
    { 
     std::cout << "Client disconnected" << std::endl ; 

     auto it = std::find(m_clients.begin(), m_clients.end(), pConn) ; 
     if (it != m_clients.end()) 
     { 
      m_clients.erase(it); 
     } 

    } 



    void on_timer() 
    { 
     //if (NeedTerminate()) 
     //{ 
     // m_service.stop(); 
     // return ; 
     //} 

     m_timer.expires_from_now(boost::posix_time::millisec(500)); 
     m_timer.async_wait(boost::bind(&CIncomingServer::on_timer, this)); 
    } 



private: 
    ip::tcp::acceptor m_acceptor ; 

    std::vector<CIncommingConnectionPtr> m_clients; 
    std::string m_sPeerName ; 
    unsigned int m_port ; 
    boost::asio::io_service  &m_service ; 
    boost::asio::deadline_timer m_timer; 
    bool       m_bAllowManyConnections; 
}; 


int _tmain(int argc, _TCHAR* argv[]) 
{ 

    boost::asio::io_service service ; 


    boost::shared_ptr<CIncomingServer> pServer; 

    try 
    { 
     pServer.reset(new CIncomingServer(service, 8000, false, "BS Server"));   
     pServer->run(); 
    } 
    catch (const boost::system::system_error &err) 
    { 
     std::cout << "Error : " << err.what() << std::endl ; 
     return 0 ; 
    } 

    service.run(); 

    return 0 ; 


} 
+0

+1視頻:) –

+1

-1。這是[SSCCE](http://sscce.org/)嗎?我對此表示懷疑。 – Abyx

+1

對不起,艾比 - 但我不同意你的疑惑。例如,在這個示例中,你認爲什麼樣的細節不重要?但謝謝你的批評! – user1503944

回答

2

長話短說:你應該綁定完成處理程序,以一個shared_ptr從shared_from_this()返回,而不是普通的this(所謂shared_from_this成語)。這樣您可以確保連接對象使用壽命的正確自動管理。

從技術上講,以下現在會發生:do_error導致2個動作來發生:

  1. 定時器取消(這是異步操作)從容器移除的
  2. CIncommingConnectionPtr(這是同步 操作)。

在點(2)連接被破壞,因爲沒有其他shared_ptr s持有它。現在timer completion handler comes ...崩潰!

+0

謝謝你的回答! 看起來這是正確的,但我無法從中得出正確的結論:-( )在將shared_from_this()傳遞給異步處理程序之後,我的程序不再崩潰,但並非所有CIncommingConnection實例都被銷燬 這看起來像我想念一些參考,而對象仍然活着 今天我將嘗試調查這種情況!非常感謝! – user1503944

+0

@ user1503944好,一目瞭然,我沒有看到任何「永恆」上面代碼中的'shared_ptr'(即使綁定到'shared_from_this'後)。也許,你的真實代碼與你發佈的不一樣。 (順便說一下,我建議你避免在緩衝區上顯式鎖定 - 相反,你可以將任何與緩衝區相關的代碼發佈到io_service上。) –

+2

@ user1503944:簡要介紹一下代碼, 'CIncommingConnection :: on_output_queue_timer()'調用鏈可能會維護引用。即使'timer.cancel()'被調用,調用鏈可能會繼續。有關此行爲的詳細信息,請參閱[文檔](http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/basic_deadline_timer/cancel/overload1.html#boost_asio.reference.basic_deadline_timer。 cancel.overload1.remarks)備註。爲了解決這個問題,考慮錯誤時返回'on_output_queue_timer()'或者'sock_.is_open()'爲false。 –

相關問題