2014-10-26 60 views
3

我使用的是Boost example中提供的代碼。升壓asio TCP異步服務器不是異步?

服務器一次只接受1個連接。這意味着,直到當前的連接關閉,纔會有新的連接。

如何讓上面的代碼同時接受無限連接?

#include <cstdlib> 
#include <iostream> 
#include <memory> 
#include <utility> 
#include <boost/asio.hpp> 

using boost::asio::ip::tcp; 

class session 
    : public std::enable_shared_from_this<session> 
{ 
public: 
    session(tcp::socket socket) 
    : socket_(std::move(socket)) 
    { 
    } 

    void start() 
    { 
    do_read(); 
    } 

private: 
    void do_read() 
    { 
    auto self(shared_from_this()); 
    socket_.async_read_some(boost::asio::buffer(data_, max_length), 
     [this, self](boost::system::error_code ec, std::size_t length) 
     { 
      if (!ec) 
      { 
      boost::this_thread::sleep(boost::posix_time::milliseconds(10000));//sleep some time 
      do_write(length); 
      } 
     }); 
    } 

    void do_write(std::size_t length) 
    { 
    auto self(shared_from_this()); 
    boost::asio::async_write(socket_, boost::asio::buffer(data_, length), 
     [this, self](boost::system::error_code ec, std::size_t /*length*/) 
     { 
      if (!ec) 
      { 
      do_read(); 
      } 
     }); 
    } 

    tcp::socket socket_; 
    enum { max_length = 1024 }; 
    char data_[max_length]; 
}; 

class server 
{ 
public: 
    server(boost::asio::io_service& io_service, short port) 
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), 
     socket_(io_service) 
    { 
    do_accept(); 
    } 

private: 
    void do_accept() 
    { 
    acceptor_.async_accept(socket_, 
     [this](boost::system::error_code ec) 
     { 
      if (!ec) 
      { 
      std::make_shared<session>(std::move(socket_))->start(); 
      } 

      do_accept(); 
     }); 
    } 

    tcp::acceptor acceptor_; 
    tcp::socket socket_; 
}; 

int main(int argc, char* argv[]) 
{ 
    try 
    { 
    if (argc != 2) 
    { 
     std::cerr << "Usage: async_tcp_echo_server <port>\n"; 
     return 1; 
    } 

    boost::asio::io_service io_service; 

    server s(io_service, std::atoi(argv[1])); 

    io_service.run(); 
    } 
    catch (std::exception& e) 
    { 
    std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return 0; 
} 

正如你所看到的,程序在等待睡眠,並且在此期間不會搶到第二個連接。

+1

如果您使用的是代碼的例子,那麼它將接受多個連接。 async_accept的AcceptHandler啓動另一個async_accept操作。 – 2014-10-26 17:05:36

+0

您究竟確實一次只接受1個連接?你測試過了嗎?如果是這樣,請解釋你如何測試以及你得到了什麼結果。你是否通過代碼檢查確定了它?如果是這樣,解釋你是如何得出這個結論的。 – 2014-10-26 18:01:56

+0

@Tanner Sansbury:我正在使用一個非常簡單的睡眠來驗證它是否接受多個連接的示例代碼。 – SpeedCoder 2014-10-26 18:29:26

回答

1

你正在執行一個同步的等待內部處理程序,該程序在爲你的io_service服務的唯一線程上運行。這使Asio等待調用處理程序來處理任何新的請求。

  1. 使用deadline_timewait_async,或者

    void do_read() { 
        auto self(shared_from_this()); 
        socket_.async_read_some(boost::asio::buffer(data_, max_length), 
              [this, self](boost::system::error_code ec, std::size_t length) { 
         if (!ec) { 
          timer_.expires_from_now(boost::posix_time::seconds(1)); 
          timer_.async_wait([this, self, length](boost::system::error_code ec) { 
            if (!ec) 
             do_write(length); 
           }); 
         } 
        }); 
    } 
    

    timer_場是session

  2. 一個boost::asio::deadline_timer成員作爲-窮人的解決方案增加更多的線程(這只是意味着如果有更多的請求到達的時間比有線程處理它們的時間多,它將一直阻塞,直到第一個線程變得可用於接收新的請求)

    boost::thread_group tg; 
    for (int i=0; i < 10; ++i) 
        tg.create_thread([&]{ io_service.run(); }); 
    
    tg.join_all(); 
    
+0

你能解釋一下如何在boost示例中初始化截止時間定時器嗎?這對我來說似乎是不可能的... – SpeedCoder 2014-10-26 23:32:37

+0

@SpeedCoder sure:http://coliru.stacked-crooked.com/a/82c7d0b7b073ca8d – sehe 2014-10-27 07:42:25

+0

我理解多線程解決方案的工作原理,但我不明白我該如何實現一個與截止日期計時器。在類服務器中,我添加一個名爲'boost :: asio :: io_service * io_service;'(這是一個指向io_service的指針),然後在調用'timer_.expires ...'之前,我初始化定時器:'deadline_timer timer_( * io_service);'這個編譯器正常,但我總是得到錯誤:'由於線程退出或應用程序請求,I/O操作已被中止。 – SpeedCoder 2014-10-27 08:10:18

2

無論原代碼和修改後的代碼是異步的,並接受多個連接。如在下面的代碼段中可以看出,async_accept操作的AcceptHandler啓動另一個async_accept操作,形成一個異步循環:

 .-----------------------------------. 
     V         | 
void server::do_accept()     | 
{           | 
    acceptor_.async_accept(...,    | 
     [this](boost::system::error_code ec) | 
     {          | 
     // ...        | 
     do_accept(); ----------------------' 
     }); 
} 

sleep()session的ReadHandler內使一個線程運行io_service阻止直到睡眠完成。因此,該計劃將無所事事。但是,這不會導致任何未完成的操作被取消。爲了更好地理解異步操作和io_service,請考慮閱讀this答案。


這裏是一個示例demonstrating服務器處理多個連接。它產生一個創建5個客戶端套接字並將它們連接到服務器的線程。

#include <cstdlib> 
#include <iostream> 
#include <memory> 
#include <utility> 
#include <vector> 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 

using boost::asio::ip::tcp; 

class session 
    : public std::enable_shared_from_this<session> 
{ 
public: 
    session(tcp::socket socket) 
    : socket_(std::move(socket)) 
    { 
    } 

    ~session() 
    { 
    std::cout << "session ended" << std::endl; 
    } 

    void start() 
    { 
    std::cout << "session started" << std::endl; 
    do_read(); 
    } 

private: 
    void do_read() 
    { 
    auto self(shared_from_this()); 
    socket_.async_read_some(boost::asio::buffer(data_, max_length), 
     [this, self](boost::system::error_code ec, std::size_t length) 
     { 
      if (!ec) 
      { 
      do_write(length); 
      } 
     }); 
    } 

    void do_write(std::size_t length) 
    { 
    auto self(shared_from_this()); 
    boost::asio::async_write(socket_, boost::asio::buffer(data_, length), 
     [this, self](boost::system::error_code ec, std::size_t /*length*/) 
     { 
      if (!ec) 
      { 
      do_read(); 
      } 
     }); 
    } 

    tcp::socket socket_; 
    enum { max_length = 1024 }; 
    char data_[max_length]; 
}; 

class server 
{ 
public: 
    server(boost::asio::io_service& io_service, short port) 
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), 
     socket_(io_service) 
    { 
    do_accept(); 
    } 

private: 
    void do_accept() 
    { 
    acceptor_.async_accept(socket_, 
     [this](boost::system::error_code ec) 
     { 
      if (!ec) 
      { 
      std::make_shared<session>(std::move(socket_))->start(); 
      } 

      do_accept(); 
     }); 
    } 

    tcp::acceptor acceptor_; 
    tcp::socket socket_; 
}; 

int main(int argc, char* argv[]) 
{ 
    try 
    { 
    if (argc != 2) 
    { 
     std::cerr << "Usage: async_tcp_echo_server <port>\n"; 
     return 1; 
    } 

    boost::asio::io_service io_service; 

    auto port = std::atoi(argv[1]); 
    server s(io_service, port); 

    boost::thread client_main(
     [&io_service, port] 
     { 
      tcp::endpoint server_endpoint(
       boost::asio::ip::address_v4::loopback(), port); 

      // Create and connect 5 clients to the server. 
      std::vector<std::shared_ptr<tcp::socket>> clients; 
      for (auto i = 0; i < 5; ++i) 
      { 
       auto client = std::make_shared<tcp::socket>(
        std::ref(io_service)); 
       client->connect(server_endpoint); 
       clients.push_back(client); 
      } 

      // Wait 2 seconds before destroying all clients. 
      boost::this_thread::sleep(boost::posix_time::seconds(2)); 
     }); 

    io_service.run(); 
    client_main.join(); 
    } 
    catch (std::exception& e) 
    { 
    std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return 0; 
} 

輸出:

session started 
session started 
session started 
session started 
session started 
session ended 
session ended 
session ended 
session ended 
session ended