2017-04-13 69 views
1

我想在同一時間使用Boost.Asio的鏈和優先包裝。如何結合束包裝和優先包裝Boost Asio

之前,我寫我的代碼,我已經閱讀了以下信息:

Boost asio priority and strand

boost::asio and Active Object

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

Why do I need strand per connection when using boost::asio?

我想使用的包裝方法,因爲我想使用各種異步API,如async_read,a sync_write和async_connect。 根據http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531,似乎可以合併優先包裝和鏈包裝。

所以我寫了基於下面的示例代碼:

http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

這裏是我的代碼:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     wrapped_handler(handler_priority_queue& q, int p, Handler h) 
      : queue_(q), priority_(p), handler_(std::move(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_PRIORITY 
       ) 
#endif 
#if ENABLE_STRAND 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

的包裝由下列宏啓用:

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

當兩個宏都啓用時,我得到以下結果T:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
execute(3) 
execute(2) 
execute(1) 
execute(0) 
before run_one() 
before run_one() 
before run_one() 

我希望我得到了

[called] priority,thread_id 

輸出

[called] 1,140512649541376 

,但我沒有得到它。

看起來功能​​,function_()被調用,但不調用wrapped_handler::operator()。 (該功能​​從pq.execute_all();在我的代碼調用。)

void execute() { 
    std::cout << "execute(" << priority_ << ")" << std::endl; 
    function_(); // It is called. 
} 

template <typename Handler> 
class wrapped_handler { 
public: 

    template <typename... Args> 
    void operator()(Args&&... args) { // It is NOT called 
     std::cout << "operator() " << std::endl; 
     handler_(std::forward<Args>(args)...); 
    } 

我跟蹤的序列function_()後調用。

下列函數調用:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94

在功能 bool strand_service::do_dispatch(implementation_type& impl, operation* op)

然後,操作op不叫,但被推入隊列INT以下行:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

我不知道爲什麼function_()被調度到strand_service。我認爲,鏈包裝已經在以下點在我的代碼unwraped:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

如果我只啓用了優先級的包裝,我得到了以下的結果。 看來,這是按我的預期工作。

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
operator() 
[called] 4,140512649541376 
execute(3) 
operator() 
[called] 3,140512649541376 
execute(2) 
operator() 
[called] 2,140512649541376 
execute(1) 
operator() 
[called] 1,140512649541376 
execute(0) 
operator() 
[called] 0,140512649541376 
before run_one() 
before run_one() 
before run_one() 

如果我只啓用了strand wrapper,我得到了以下結果。 看來,我也在按預期工作。

before run_one() 
[called] 0,140127385941760 
before poll_one() 
[called] 1,140127385941760 
[called] 2,140127385941760 
[called] 3,140127385941760 
[called] 4,140127385941760 
before execute_all() 
before run_one() 
before run_one() 
before run_one() 

任何想法?

回答

1

我解決了這個問題。

我不知道爲什麼function_()被調度到strand_service。我認爲,鏈包裝已經在以下點在我的代碼unwraped:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

參數f是原來的處理程序。這意味着優先級隊列包裹和鏈處理程序。包裝線在外面。所以當調用f時,它被調度到strand_service。這個過程發生在同一個strand_service中,所以處理程序不會被調用。

爲了解決這個問題,添加h->handler_入優先級隊列,而不是f如下:

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

handler_是類模板wrapped_handler的成員變量。它擁有未被包裝的處理程序。

下面是完整的代碼:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     template <typename HandlerArg> 
     wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h) 
      : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_STRAND 
       ) 
#endif 
#if ENABLE_PRIORITY 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

這裏是輸出:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
[called] 4,139903315736320 
execute(3) 
[called] 3,139903315736320 
execute(2) 
[called] 2,139903315736320 
execute(1) 
[called] 1,139903315736320 
execute(0) 
[called] 0,139903315736320 
before run_one() 
before run_one() 
before run_one()