2013-12-20 24 views
4

我有一個運行在boost::asio::io_service有一些屬性的對象。類似的東西:如何等待一個asio處理程序?

class Foo 
{ 
    private: 

    // Not an int in my real code, but it doesn't really matter. 
    int m_bar; 
    boost::asio::io_service& m_io_service; 
    boost::asio::strand m_bar_strand; 
}; 

m_bar是從一個即通過鏈m_bar_strand稱爲處理才能使用。這使我不會從這些處理程序中鎖定。

從運行io_service::run()我寫了一個asynchronous_setter,像這樣一個線程之外設置m_bar屬性:

class Foo 
{ 
    public: 
    void async_get_bar(function<void (int)> handler) 
    { 
     m_bar_strand.post(bind(&Foo::do_get_bar, this, handler)); 
    } 

    void async_set_bar(int value, function<void()> handler) 
    { 
     m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler)); 
    } 

    private: 

    void do_get_bar(function<void (int)> handler) 
    { 
     // This is only called from within the m_bar_strand, so we are safe. 

     // Run the handler to notify the caller. 
     handler(m_bar); 
    } 

    void do_set_bar(int value, function<void()> handler) 
    { 
     // This is only called from within the m_bar_strand, so we are safe. 
     m_bar = value; 

     // Run the handler to notify the caller. 
     handler(); 
    } 

    int m_bar; 
    boost::asio::io_service& m_io_service; 
    boost::asio::strand m_bar_strand; 
}; 

這工作完全,但現在我想寫的set_bar同步版本設置價值和回報只有當集合有效。它仍然必須保證有效設置將在m_bar_strand內發生。理想情況下,可重入。

我可以想象解決方案的信號量將被修改從處理程序內,但我所提出的一切似乎hackish,真的不優雅。在Boost/Boost Asio中有什麼可以允許這樣的事情嗎?

您將如何繼續實施此方法?

+1

空隙sync_set_bar(int值,函數處理程序){ m_bar = value; handler();}繼續make m_bar boost :: atomic 。 – IdeaHat

+0

(儘管我不確定在m_bar上需要什麼同步控制,所以我可能只是把你搞砸了)當事情不是異步時,不需要使用asio。 – IdeaHat

+0

@MadScienceDreams:'m_bar'值(無論其類型)只能在** m_bar_strand中運行的處理程序中從**更改(或訪問)。這可以防止不使用互斥體的競爭條件,並且非常好。我只想將異步調用轉換爲同步調用。 – ereOn

回答

4

如果您需要在值同步等待進行設置,然後Boost.Thread的futures可以提供完美的解決方案:

期貨庫提供的同步處理未來價值的一種手段,無論這些值由另一個線程或單個線程響應外部刺激或按需生成。

總之,創建了一個boost::promise並允許在其上設置一個值。該值稍後可以通過關聯的boost::future檢索。這是一個基本的例子:這種方法

boost::promise<int> promise; 
boost::unique_future<int> future = promise.get_future(); 

// start asynchronous operation that will invoke future.set_value(42) 
... 

assert(future.get() == 42); // blocks until future has been set. 

兩個等顯着優點:

  • future是C++ 11的一部分。
  • 異常甚至可以通過promise::set_exception()傳遞給future,支持向調用者提供異常或錯誤的優雅方式。

這裏是基於原始代碼的完整示例:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 

class Foo 
{ 
public: 

    Foo(boost::asio::io_service& io_service) 
    : m_io_service(io_service), 
     m_bar_strand(io_service) 
    {} 

public: 

    void async_get_bar(boost::function<void(int)> handler) 
    { 
    m_bar_strand.post(bind(&Foo::do_get_bar, this, handler)); 
    } 

    void async_set_bar(int value, boost::function<void()> handler) 
    { 
    m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler)); 
    } 

    int bar() 
    { 
    typedef boost::promise<int> promise_type; 
    promise_type promise; 

    // Pass the handler to async operation that will set the promise. 
    void (promise_type::*setter)(const int&) = &promise_type::set_value; 
    async_get_bar(boost::bind(setter, &promise, _1)); 

    // Synchronously wait for promise to be fulfilled. 
    return promise.get_future().get(); 
    } 

    void bar(int value) 
    { 
    typedef boost::promise<void> promise_type; 
    promise_type promise; 

    // Pass the handler to async operation that will set the promise. 
    async_set_bar(value, boost::bind(&promise_type::set_value, &promise)); 

    // Synchronously wait for the future to finish. 
    promise.get_future().wait(); 
    } 

private: 

    void do_get_bar(boost::function<void(int)> handler) 
    { 
    // This is only called from within the m_bar_strand, so we are safe. 

    // Run the handler to notify the caller. 
    handler(m_bar); 
    } 

    void do_set_bar(int value, boost::function<void()> handler) 
    { 
    // This is only called from within the m_bar_strand, so we are safe. 
    m_bar = value; 

    // Run the handler to notify the caller. 
    handler(); 
    } 

    int m_bar; 
    boost::asio::io_service& m_io_service; 
    boost::asio::strand m_bar_strand; 
}; 

int main() 
{ 
    boost::asio::io_service io_service; 
    boost::asio::io_service::work work(io_service); 
    boost::thread t(
     boost::bind(&boost::asio::io_service::run, boost::ref(io_service))); 

    Foo foo(io_service); 
    foo.bar(21); 
    std::cout << "foo.bar is " << foo.bar() << std::endl; 
    foo.bar(2 * foo.bar()); 
    std::cout << "foo.bar is " << foo.bar() << std::endl; 

    io_service.stop(); 
    t.join(); 
} 

,其提供以下輸出:

foo.bar is 21 
foo.bar is 42 
+0

我沒有想到'future':很可能是更好的解決方案!但是,這段代碼是否創建了一個線程來等待未來? – ereOn

+0

顯然,它沒有......當你想到它時這是有道理的。謝謝你的訣竅!絕對比我自己製作的解決方案更好的風格。也許值得一提的是,調用setter/getter **內部處理程序可能會死鎖(如果被調用的處理程序在與調用處理程序相同的線程內運行)。 – ereOn

0

當在async_set_bar()中設置值時,可以使用管道通知同步方法。警告,下面的代碼是腦編譯並可能有錯誤,但它應該得到整個

#include <boost/asio.hpp> 

#include <iostream> 
#include <thread>                              

class Foo                                
{ 
public:                                 
    Foo(boost::asio::io_service& io_service) :                       
     _bar(0), 
     _io_service(io_service),                          
     _strand(_io_service),                           
     _readPipe(_io_service), 
     _writePipe(_io_service) 
    { 
     boost::asio::local::connect_pair(_readPipe, _writePipe); 
    } 

    void set_async(int v) { 
     _strand.post([=] 
      { 
       _bar = v; 
       std::cout << "sending " << _bar << std::endl; 
       _writePipe.send(boost::asio::buffer(&_bar, sizeof(_bar))); 
      } 
      ); 
    } 

    void set_sync(int v) { 
     this->set_async(v); 
     int value; 
     _readPipe.receive(boost::asio::buffer(&value, sizeof(value))); 
     std::cout << "set value to " << value << std::endl; 
    } 


private: 
    int _bar; 
    boost::asio::io_service& _io_service; 
    boost::asio::io_service::strand _strand; 
    boost::asio::local::stream_protocol::socket _readPipe; 
    boost::asio::local::stream_protocol::socket _writePipe; 
}; 

int 
main() 
{ 
    boost::asio::io_service io_service; 
    boost::asio::io_service::work w(io_service); 
    std::thread t([&]{ io_service.run(); }); 
    Foo f(io_service); 
    f.set_sync(20); 
    io_service.stop(); 
    t.join(); 
} 

點,如果你不能使用C++ 11個lambda表達式,與boost::bind替換他們和一些完成處理的方法。

0

這是我想出了:

class synchronizer_base 
{ 
    protected: 
     synchronizer_base() : 
      m_has_result(false), 
      m_lock(m_mutex) 
     { 
     } 

     void wait() 
     { 
      while (!m_has_result) 
      { 
       m_condition.wait(m_lock); 
      } 
     } 

     void notify_result() 
     { 
      m_has_result = true; 
      m_condition.notify_all(); 
     } 

    private: 

     boost::atomic<bool> m_has_result; 
     boost::mutex m_mutex; 
     boost::unique_lock<boost::mutex> m_lock; 
     boost::condition_variable m_condition; 
}; 

template <typename ResultType = void> 
class synchronizer : public synchronizer_base 
{ 
    public: 

     void operator()(const ResultType& result) 
     { 
      m_result = result; 

      notify_result(); 
     } 

     ResultType wait_result() 
     { 
      wait(); 

      return m_result; 
     } 

    private: 

     ResultType m_result; 
}; 

template <> 
class synchronizer<void> : public synchronizer_base 
{ 
    public: 

     void operator()() 
     { 
      notify_result(); 
     } 

     void wait_result() 
     { 
      wait(); 
     } 
}; 

我可以使用它,這種方式:

class Foo 
{ 
    public: 
    void async_get_bar(function<void (int)> handler) 
    { 
     m_bar_strand.post(bind(&Foo::do_get_bar, this, value, handler)); 
    } 

    void async_set_bar(int value, function<void()> handler) 
    { 
     m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler)); 
    } 

    int get_bar() 
    { 
     synchronizer<int> sync; 

     async_get_bar(boost::ref(sync)); 

     return sync.wait_result(); 
    } 

    void set_bar(int value) 
    { 
     synchronizer<void> sync; 

     async_set_bar(value, boost::ref(sync)); 

     sync.wait_result(); 
    } 
}; 

boost::ref是必要的,因爲synchronizer的實例是不可複製的。這可以通過將synchronizer包裝在其他一些容器類中來避免,但是我很好地使用該解決方案。

注意:不是從處理程序中調用這樣的「同步」函數,或者它可能只是死鎖!

相關問題