2017-04-02 73 views
0

我想從我的項目的boost TCP客戶端示例中創建一個客戶端類,並且我注意到有時在連接到不存在的主機時handle_connect不會被調用。async_connect不會在TCP客戶端類中調用處理程序

我在堆棧上看過類似的問題,在這裏人們忘記運行io_service或在任何任務發佈之前調用它,但我不認爲這是我的情況,因爲我剛剛啓動io_service.run()線程調用async_connect,併成功連接,網絡不可達,以及我測試過的其他一些案例工作得很好。

以下是完整的清單:

tcp_client.hpp

#ifndef TCP_CLIENT_HPP 
#define TCP_CLIENT_HPP 

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/chrono.hpp> 
#include <boost/thread/thread.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/make_shared.hpp> 
#include <mutex> 
#include <iostream> 
#include <iomanip> 

namespace com { 

using boost::asio::ip::tcp; 
using namespace std; 

class client : public boost::enable_shared_from_this<client> { 

private: 

    std::mutex mx_; 
    bool stopped_ = 1; 
    boost::asio::streambuf ibuf_; 
    boost::shared_ptr<boost::asio::io_service> io_service_; 
    boost::shared_ptr<boost::asio::ip::tcp::socket> sock_; 
    boost::shared_ptr<tcp::resolver::iterator> ei_; 
    std::vector<std::string> inbound_; 
    std::string host_, port_; 

public: 

    client() {} 

    void connect(std::string host, std::string port) { 
    if (!stopped_) stop(); 
    host_ = host; port_ = port; 
    io_service_.reset(new boost::asio::io_service); 
    sock_.reset(new boost::asio::ip::tcp::socket(*io_service_)); 
    ei_.reset(new tcp::resolver::iterator); 
    tcp::resolver r(*io_service_); 
    ei_ = boost::make_shared<tcp::resolver::iterator>(r.resolve(tcp::resolver::query(host_, port_))); 
    stopped_ = 0; 
    start_connect(); 
    boost::thread work(boost::bind(&client::work, shared_from_this())); 
    return; 
    } 

    bool is_running() { 
    return !stopped_; 
    } 

    void stop() { 
    stopped_ = 1; 
    sock_->close(); 
    return; 
    } 

    void send(std::string str) { 
    if (stopped_) return; 
    auto msg = boost::asio::buffer(str, str.size()); 
    boost::asio::async_write((*sock_), msg, boost::bind(&client::handle_write, shared_from_this(), _1)); 
    return; 
    } 

    std::string pull() { 
    std::lock_guard<std::mutex> lock(mx_); 
    std::string msg; 
    if (inbound_.size()>0) { 
     msg = inbound_.at(0); 
     inbound_.erase(inbound_.begin()); 
    } 
    return msg; 
    } 

    int size() { 
    std::lock_guard<std::mutex> lock(mx_); 
    return inbound_.size(); 
    } 

    void clear() { 
    std::lock_guard<std::mutex> lock(mx_); 
    inbound_.clear(); 
    return; 
    } 

private: 

    void work() { 
    if (stopped_) return; 
    std::cout<<"work in"<<std::endl; 
    io_service_->run(); 
    std::cout<<"work out"<<std::endl; 
    return; 
    } 

    void start_connect() { 
    if ((*ei_) != tcp::resolver::iterator()) { 
     std::cout<<"Trying "<<(*ei_)->endpoint()<<std::endl; 
     sock_->async_connect((*ei_)->endpoint(), boost::bind(&client::handle_connect, shared_from_this(), boost::asio::placeholders::error)); 
    } else { 
     stop(); 
    } 
    return; 
    } 

    void handle_connect(const boost::system::error_code& ec) { 
    if (stopped_) return; 

    if (!sock_->is_open()) { 
     std::cout<<"Socket closed"<<std::endl; 
     (*ei_)++; 
     start_connect(); 
    } else if (ec) { 
     std::cout<<"Connect error: "<<ec.message()<<std::endl; 
     sock_->close(); 
     (*ei_)++; 
     start_connect(); 
    } else { 
     std::cout<<"Connected to "<<(*ei_)->endpoint()<<std::endl; 
     start_read(); 
    } 

    return; 
    } 

    void start_read() { 
    if (stopped_) return; 
    boost::asio::async_read_until((*sock_), ibuf_, "", boost::bind(&client::handle_read, shared_from_this(), boost::asio::placeholders::error)); 
    return; 
    } 

    void handle_read(const boost::system::error_code& ec) { 
    std::lock_guard<std::mutex> lock(mx_); 
    if (stopped_) return; 
    if (ec) { 
     std::cout<<"Read error: "<<ec.message()<<std::endl; 
     stop(); 
     return; 
    } 

    std::string line; 
    std::istream is(&ibuf_); 
    std::getline(is, line); 
    if (!line.empty() && inbound_.size()<1000) inbound_.push_back(line); 

    start_read(); 
    return; 
    } 

private: 

    void handle_write(const boost::system::error_code& ec) { 
    if (stopped_) return; 
    if (ec) { 
     std::cout<<"Write error: "<<ec.message()<<std::endl; 
     stop(); 
     return; 
    } 
    return; 
    } 

}; 

}; 

tcp_test.cpp

#include "tcp_client.hpp" 

int main(int argc, char* argv[]) { 
    auto tcp_client = boost::shared_ptr<com::client>(new com::client); 

    try { 
    tcp_client->connect("192.168.1.15", "50000"); 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(1000)); 
    tcp_client->connect("192.168.1.20", "50000"); 
    } catch (std::exception& e) { 
    std::cerr<<"Exception: "<<e.what()<<std::endl; 
    } 

    int cnt=0; 
    while (cnt<5) { 
    std::cout<<cnt<<std::endl; 
    cnt++; 
    tcp_client->send("<test>"); 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(500)); 
    } 

    tcp_client->stop(); 

    while (tcp_client->size()>0) std::cout<<tcp_client->pull()<<std::endl; 

    return 0; 
} 

我得到的輸出連接到環回服務器時:

Trying 192.168.1.15:50000 
work in 
work out 
Trying 192.168.1.20:50000 
0 
work in 
Connected to 192.168.1.20:50000 
1 
2 
3 
4 
work out 
<test> 
<test> 
<test> 
<test> 
<test> 

192.168.1.20正如你所看到的那樣工作。 192.168.1.15並不存在,但我預料它會引發某種錯誤。相反,io_service.run()立即返回,就像async_connect永遠不會發布回調任務。也許它與端點迭代器有關而不是async_connect?

任何人都可以請解釋爲什麼會發生這樣的事情?

然後我試着在此代碼隔離問題:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/chrono.hpp> 
#include <boost/thread/thread.hpp> 

boost::asio::io_service io_svc; 
boost::asio::ip::tcp::socket sock(io_svc); 
boost::asio::ip::tcp::resolver::iterator ei; 

void work() { 
    std::cout<<"work in"<<std::endl; 
    io_svc.run(); 
    std::cout<<"work out"<<std::endl; 
    return; 
} 

void stop() { 
    sock.close(); 
    return; 
} 

void start_connect(); 

void handle_connect(const boost::system::error_code& ec) { 
    if (!sock.is_open()) { 
    std::cout<<"Socket closed"<<std::endl; 
    ei++; 
    start_connect(); 
    } else if (ec) { 
    std::cout<<"Connect error: "<<ec.message()<<std::endl; 
    sock.close(); 
    ei++; 
    start_connect(); 
    } else { 
    std::cout<<"Connected to "<<ei->endpoint()<<std::endl; 
    } 
    return; 
} 

void start_connect() { 
    if (ei != boost::asio::ip::tcp::resolver::iterator()) { 
    std::cout<<"Trying "<<ei->endpoint()<<std::endl; 
    sock.async_connect(ei->endpoint(), boost::bind(handle_connect, boost::asio::placeholders::error)); 
    } else { 
    stop(); 
    } 
    return; 
} 

int main(int argc, char* argv[]) { 

    std::string host="192.168.1.15", port="50000"; 

    boost::asio::ip::tcp::resolver r(io_svc); 
    ei = r.resolve(boost::asio::ip::tcp::resolver::query(host, port)); 
    start_connect(); 
    boost::thread* thr = new boost::thread(work); 

    boost::this_thread::sleep_for(boost::chrono::milliseconds(2000)); 

    return 0; 
} 

但我有一個完全不同的結果。當我嘗試連接到一個不存在的主機,大部分的時間是:

Trying 192.168.1.15:50000 
work in 

有時是:

Trying 192.168.1.15:50000 
work in 
Connect error: Operation canceled 
Connect error: Operation canceled 

,很少是:

Trying 192.168.1.15:50000 
work in 
Segmentation fault 

「制定出」永遠不會打印,所以我猜這個例子中的io_service正在做一些事情,但是這與以前的代碼有什麼不同,以及爲什麼我有時只會得到「操作取消」錯誤?

+0

operation_canceled暗示的東西壽命臨終前有一個機會,有調用的回調。另一方面,不需要客戶端中的所有shared_ptrs。這足以讓你的眼睛流血。客戶端已由shared_ptr擁有。這足以控制所有asio對象的生命週期。 –

回答

0

在後臺線程中運行的客戶端應該看起來像這樣。

請注意,我有注意事項包括連接超時。爲此,您希望有一個與async_connect並行運行的截止日期計時器。然後,您必須正確處理交叉案例(提示:取消成功連接的截止日期計時器,並從async_wait中丟棄隨後發生的錯誤)。

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/chrono.hpp> 
#include <thread> 
#include <functional> 

boost::asio::io_service io_svc; 

struct client 
    : std::enable_shared_from_this<client> 
{ 
    using protocol = boost::asio::ip::tcp; 
    using resolver = protocol::resolver; 
    using socket = protocol::socket; 

    using error_code = boost::system::error_code; 

    client(boost::asio::io_service& ios) 
     : ios_(ios) {} 

    void start(std::string const& host, std::string const& service) 
    { 
     auto presolver = std::make_shared<resolver>(get_io_service()); 

     presolver->async_resolve(protocol::resolver::query(host, service), 
           strand_.wrap([self = shared_from_this(), presolver](auto&& ec, auto iter) 
               { 
                self->handle_resolve(ec, presolver, iter); 
               })); 

    } 

private: 
    void 
    handle_resolve(boost::system::error_code const& ec, std::shared_ptr<resolver> presolver, resolver::iterator iter) 
    { 
     if (ec) { 
      std::cerr << "error resolving: " << ec.message() << std::endl; 
     } 
     else { 
      boost::asio::async_connect(sock, iter, strand_.wrap([self = shared_from_this(), 
                    presolver] 
                    (auto&& ec, auto iter) 
                   { 
                    self->handle_connect(ec, iter); 
                    // note - we're dropping presolver here - we don't need it any more 
                   })); 
     } 
    } 

    void handle_connect(error_code const& ec, resolver::iterator iter) 
    { 
     if (ec) { 
      std::cerr << "failed to connect: " << ec.message() << std::endl; 
     } 
     else { 
      auto payload = std::make_shared<std::string>("Hello"); 

      boost::asio::async_write(sock, boost::asio::buffer(*payload), 
            strand_.wrap([self = shared_from_this(), 
                 payload] // note! capture the payload so it continues to exist during async send 
                 (auto&& ec, auto size) 
                { 
                 self->handle_send(ec, size); 
                })); 
     } 
    } 

    void handle_send(error_code const& ec, std::size_t size) 
    { 
     if (ec) { 
      std::cerr << "send failed after " << size << " butes : " << ec.message() << std::endl; 
     } 
     else { 
      // send something else? 
     } 
    } 

    boost::asio::io_service& get_io_service() 
    { 
     return ios_; 
    } 

private: 

    boost::asio::io_service& ios_; 
    boost::asio::strand strand_{get_io_service()}; 
    socket    sock{get_io_service()}; 

}; 

void work() 
{ 
    std::cout << "work in" << std::endl; 
    io_svc.run(); 
    std::cout << "work out" << std::endl; 
    return; 
} 

int main(int argc, char *argv[]) 
{ 

    auto  pclient = std::make_shared<client>(io_svc); 
    std::string host = "192.168.1.15", port = "50000"; 
    pclient->start(host, port); 

    auto run_thread = std::thread(work); 
    if (run_thread.joinable()) 
     run_thread.join(); 

    return 0; 
} 

輸出示例:

work in 
    <time passes>... 
failed to connect: Operation timed out 
work out 
+0

首先 - 感謝您的努力。我並不是想表現出忘恩負義,但我一直在尋找解釋爲什麼io_service.run()在一個特定情況下沒有做任何事情而返回,也許爲什麼在非類版本中,當我試圖找到問題時,我有這種不穩定的行爲。我不是真的希望你做我的編碼,我只是想知道發生了什麼,並明白我在這裏錯過了什麼。 – thu87l

+0

@ thu87l看看我是如何管理生命和你是如何的區別。你的問題在那裏 –

+0

我想我看看問題在哪裏。使用resolver.async_resolve()並傳遞解析器以確保它的活動時間足夠長,以便調用handle_resolve()。然後當查詢解決時,您檢查錯誤。 在我的課中,我使用了endpoint_iterator = resolver.resolve()。我不確定它是如何工作的(例如:[link](http://www.boost.org/doc/libs/1_45_0/doc/html/boost_asio/example/timeouts/async_tcp_client.cpp)),所以我認爲它阻塞直到解決。 – thu87l