2016-08-09 134 views
1

我需要建立多達三個不同的TCP連接到不同的服務器。所有這三個連接都需要不同的協議,不同的握手和不同的心跳。學習http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/example/cpp11/chat/chat_client.cpp,閱讀這裏的東西,並按照克里斯Kohlhoffs建議我試圖執行它如下。boost :: asio和多個客戶端連接使用異步

問題是,在這種體系結構中,無論我在做什麼,在調用doConnect()中的shared_from_this()時都會收到bad_weak_pointer異常。

Importent這些只是一個未運行的代碼片斷,它可能包含錯誤! Importent

我有一個包含一些基本方法的基類。

Connection.h

class Connection : public std::enable_shared_from_this<Connection> 
{ 
public: 
    //! Ctor 
    inline Connection(); 
    //! Dtor 
    inline virtual ~Connection(); 
    inline void setReconnectTime(const long &reconnectAfterMilisec) 
    { 
    m_reconnectTime = boost::posix_time::milliseconds(reconnectAfterMilisec); 
    } 
    inline void setHandshakePeriod(const long &periodInMilisec) 
    { 
    m_handshakePeriod = boost::posix_time::milliseconds(periodInMilisec); 
    } 
    virtual void doConnect() = 0; 
    virtual void stop() = 0; 
    //... and some view more... 
} 

我有一個從基類派生那麼我的三個等級。這裏只有一個(也是核心部分)來描述該方法。

ConnectionA.h

//queues which containing also the age of the messages 
typedef std::deque<std::pair<handshakeMsg, boost::posix_time::ptime>> handskMsg_queue; 
typedef std::deque<std::pair<errorcodeMsg, boost::posix_time::ptime>> ecMsg_queue; 
typedef std::deque<std::pair<A_Msg, boost::posix_time::ptime>> A_Msg_queue; 

class ConnectionA : public Connection 
{ 
public: 
    ConnectionA(); 
    ConnectionA(const std::string& IP, const int &port); 
    ConnectionA& operator=(const ConnectionA &other); 
    virtual ~ConnectionA(); 
    virtual void stop() override; 
    virtual void doConnect() override; 
    void doPost(std::string &message); 
    void doHandshake(); 
    void sendErrorCode(const int &ec); 

    std::shared_ptr<boost::asio::io_service>m_ioS; 

private: 
    std::shared_ptr<tcp::socket> m_socket; 
    std::shared_ptr<boost::asio::deadline_timer> m_deadlineTimer; // for reconnetions 
    std::shared_ptr<boost::asio::deadline_timer> m_handshakeTimer; // for heartbeats 

    void deadlineTimer_handler(const boost::system::error_code& error); 
    void handshakeTimer_handler(const boost::system::error_code& error); 
    void doRead(); 
    void doWrite(); 

    std::string m_IP; 
    int m_port; 
    handskMsg_queue m_handskMsgQueue; 
    ecMsg_queue m_ecMsgQueue; 
    A_Msg_queue m_AMsgQueue; 
} 

ConnectionA.cpp

ConnectionA::ConnectionA(const std::string &IP, const int &port) 
: m_ioS() 
, m_socket() 
, m_deadlineTimer() 
, m_handshakeTimer() 
, m_IP(IP) 
, m_port(port) 
, m_handskMsgQueue(10) 
, m_ecMsgQueue(10) 
, m_AMsgQueue(10) 
{ 
    m_ioS = std::make_shared<boost::asio::io_service>(); 
    m_socket = std::make_shared<tcp::socket>(*m_ioS); 
    m_deadlineTimer = std::make_shared<boost::asio::deadline_timer>(*m_ioS); 
    m_handshakeTimer = std::make_shared<boost::asio::deadline_timer> (*m_ioS); 
    m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error)); 
    m_handshakeTimer->async_wait(boost::bind(&ConnectionA::handshakeTimer_handler, this, boost::asio::placeholders::error)); 
} 
ConnectionA::~ConnectionA() 
{} 

void ConnectionA::stop() 
{ 
    m_ioS->post([this]() { m_socket->close(); }); 
    m_deadlineTimer->cancel(); 
    m_handshakeTimer->cancel(); 
} 

void ConnectionA::doConnect() 
{ 
    if (m_socket->is_open()){ 
    return; 
    } 
    tcp::resolver resolver(*m_ioS); 
    std::string portAsString = std::to_string(m_port); 
    auto endpoint_iter = resolver.resolve({ m_IP.c_str(), portAsString.c_str() }); 
    m_deadlineTimer->expires_from_now(m_reconnectTime); 
    // this gives me a bad_weak_pointer exception!!! 
    auto self = std::static_pointer_cast<ConnectionA>(static_cast<ConnectionA*>(this)->shared_from_this()); 
    boost::asio::async_connect(*m_socket, endpoint_iter, [this, self](boost::system::error_code ec, tcp::resolver::iterator){ 
    if (!ec) 
    { 
     doHandshake(); 
     doRead(); 
    } 
    else { 
     // don't know if async_connect can fail but set the socket to open 
     if (m_socket->is_open()){ 
     m_socket->close(); 
     } 
    } 
    }); 
} 

void ConnectionA::doRead() 
{ 
    auto self(shared_from_this()); 
    boost::asio::async_read(*m_socket, 
    boost::asio::buffer(m_readBuf, m_readBufSize), 
    [this, self](boost::system::error_code ec, std::size_t){ 
    if(!ec){ 
     // check server answer for errors 
     } 
     doRead(); 
    } 
    else { 
     stop(); 
    } 
    }); 
} 

void ConnectionA::doPost(std::string &message) 
{ 
    A_Msg newMsg (message); 
    auto self(shared_from_this()); 
    m_ioS->post([this, self, newMsg](){ 
    bool writeInProgress = false; 
    if (!m_A_MsgQueue.empty()){ 
     writeInProgress = true; 
    } 
    boost::posix_time::ptime currentTime = time_traits_t::now(); 
    m_AMsgQueue.push_back(std::make_pair(newMsg,currentTime)); 
    if (!writeInProgress) 
    { 
     doWrite(); 
    }  
    }); 
} 

void ConnectionA::doWrite() 
{ 
    while (!m_AMsgQueue.empty()) 
    { 
    if (m_AMsgQueue.front().second + m_maxMsgAge < time_traits_t::now()){ 
      m_AMsgQueue.pop_front(); 
      continue; 
    } 
    if (!m_socket->is_open()){ 
     continue; 
    } 
    auto self(shared_from_this()); 
    boost::asio::async_write(*m_socket, 
     boost::asio::buffer(m_AMsgQueue.front().first.data(), 
     m_AMsgQueue.front().first.A_lenght), 
     [this, self](boost::system::error_code ec, std::size_t /*length*/) 
    { 
     if (!ec) // successful 
     { 
     m_handshakeTimer->expires_from_now(m_handshakePeriod); // reset timer 
     m_AMsgQueue.pop_front(); 
     doWrite(); 
     } 
     else { 
     if (m_socket->is_open()){ 
      m_socket->close(); 
     } 
     } 
    }); 
    } 
} 

void ConnectionA::deadlineTimer_handler(const boost::system::error_code& error){ 
    if (m_stopped){ 
    return; 
    } 
    m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error)); 
    if (!error && !m_socket->is_open()) // timer expired and no connection was established 
    {  
    doConnect(); 
    } 
    else if (!error && m_socket->is_open()){ // timer expired and connection was established 
    m_deadlineTimer->expires_at(boost::posix_time::pos_infin); // to reactivate timer call doConnect() 
    } 
} 

最後也有其封裝這些類使其更舒適的使用另一個類:

TcpConnect.h

class CTcpConnect 
{ 
public: 
    /*! Ctor 
    */ 
    CTcpConnect(); 
    //! Dtor 
    ~CTcpConnect(); 

    void initConnectionA(std::string &IP, const int &port); 
    void initConnectionB(std::string &IP, const int &port); 
    void initConnectionC(std::string &IP, const int &port); 

    void postMessageA(std::string &message); 

    void run(); 
    void stop(); 
private: 
    ConnectionA m_AConnection; 
    ConnectionB m_BConnection; 
    ConnectionC m_CConnection; 
} 

TcpConnect.cpp

CTcpConnect::CTcpConnect() 
: m_AConnection() 
, m_BConnection() 
, m_CConnection() 
{} 

CTcpConnect::~CTcpConnect() 
{} 

void CTcpConnect::run(){ 
    [this](){ m_AConnection.m_ioS->run(); }; 
    [this](){ m_BConnection.m_ioS->run(); }; 
    [this](){ m_CConnection.m_ioS->run(); }; 
} 

void CTcpConnect::stop(){ 
    m_AConnection.stop(); 
    m_BConnection.stop(); 
    m_CConnection.stop(); 
} 

void CTcpConnect::initConnectionA(std::string &IP, const int &port) 
{ 
    m_AConnection = ConnectionA(IP, port); 
    m_AConnection.setMaxMsgAge(30000); 
    //... set some view parameter more 
    m_AConnection.doConnect(); 
} 
// initConnectionB & initConnectionC are quite the same 

void CTcpConnect::postMessageA(std::string &message) 
{ 
    m_AConnection.doWrite(message); 
} 

在我嘗試也只有一個io_service對象(對於我的做法,這將是罰款)​​開始,但持有該服務只是作爲參考了一些頭痛,因爲我的實現還需要連接的默認構造函數。現在每個連接都有自己的io服務。

任何想法如何讓這段代碼運行? 隨時爲其他架構提出建議。如果你能想出這些,一些片段會更好。我已經在這個實施過程中苦苦掙扎了數週了。我很感激每一個提示。

順便說一句我用VS12的boost 1.61。

+0

使用asio井是一門藝術。如果我告訴你,asio定時器和套接字不應該被保存在共享指針中,並且io_service引用是一個傳遞給對象的簡單引用,它首先聽起來會違反直覺,但這是方式去做吧。通過將shared_from_this()的結果傳遞給異步處理程序,您的對象可以保持自己和它們的緩衝區處於活動狀態。當你要停止對象時,請在所有活動的io對象上調用cancel()。優秀的處理程序將執行指示錯誤。 –

+0

Tnx爲您的建議。我知道我知道,實際上每個人都在說。但問題是,這給你一些限制。然而,閱讀這些帖子http://stackoverflow.com/questions/27697973/shared-from-this-causing-bad-weak-ptr和http://stackoverflow.com/questions/29623652/using-boostasioio-service-as -class-member-field我仍然希望將我的方法運行起來。 – GregPhil

回答

1

這就是問題所在:

m_AConnection = ConnectionA(IP, port); 

也就是說,ConnectionAConnectionenable_shared_from_this派生派生。這意味着ConnectionA必須實例化爲shared_from_this的共享指針才能工作。

試試這個:

void CTcpConnect::initConnectionA(std::string &IP, const int &port) 
{ 
    m_AConnection = std::make_shared<ConnectionA>(IP, port); 
    m_AConnection->setMaxMsgAge(30000); 
    //... set some view parameter more 
    m_AConnection->doConnect(); 
} 

EDIT1:

你是對的。這是問題。現在我意識到,我打電話給io-service.run()的方式完全是廢話。

這是非常罕見的使用超過一個io_service,和極其罕見的使用每個連接一個:)

但是,你知道,如果我需要投然後調用shared_from_this()?我注意到asynch_connect()可以在沒有演員的情況下正常工作。

許多Asio的例子使用shared_from_this()爲了方便起見,我舉例來說,根本沒有在我的項目中使用它。在使用Asio時,您需要小心一些規則。例如,如果lambda函數捕獲一個指向保存緩衝區的對象的共享指針,則在執行相應的回調之前,讀寫緩衝區不得被破壞,這種情況可以保持平凡。

例如,你可以做這樣的事情還有:

auto data = std::make_shared<std::vector<uint8_t>>(10); 
async_read(socket, 
      boost::asio::const_buffer(*data), 
      [data](boost::system::error_code, size_t) {}); 

這將是有效的,但將有你裏面std::vector每個讀取被分配一個新的數據性能缺點。

另一個原因是,當你看到一些你的一些lambda表達式shared_from_this()是可以看到有用的,他們往往有以下形式:

[this, self,...](...) {...} 

也就是說,你經常要使用this在他們裏面。如果您還沒有捕獲到self,則需要使用其他措施確保this在調用處理程序時未被銷燬。

+0

你說得對。這是問題。現在我意識到我調用'io_service.run()'的方式完全是垃圾。我現在看我如何能夠實現這是正確和明確的方式。然而,你知道我是否需要演員,然後調用'shared_from_this()'?我注意到''asynch_connect()'可以在沒有演員的情況下正常工作。 – GregPhil

+1

是的,正如@RichardHodges在上面評論中提到的那樣,使用Asio通常是一門藝術。編輯我的答案來解決你的問題。 –

+0

非常感謝您的詳細解釋。現在,爲什麼需要我的課程的shared_ptr更清晰。不過,我認爲這有點誤解。我是否需要像'auto self = std :: static_pointer_cast (static_cast (this) - > shared_from_this());''因爲我的類'ConnectionA'派生自'Connection',它來自'enable_share_from_this '或者足夠做'auto self = share_from_this()'。兩者似乎都有效。 – GregPhil