2012-06-05 55 views
4

我在代碼中發現了這個奇怪的錯誤。這是我設法解決的自包含測試案例。boost :: asio錯誤?破壞io_service之前的task_io_service

#include <memory> 
#include <thread> 
#include <stack> 
#include <system_error> 

#include <boost/asio.hpp> 

using boost::asio::io_service; 
using std::placeholders::_1; 

class async_service 
{ 
public: 
    async_service(); 
    async_service(size_t number_threads); 
    ~async_service(); 

    async_service(const async_service&) = delete; 
    void operator=(const async_service&) = delete; 

    void spawn(); 
    void shutdown(); 

    io_service& get_service(); 
    const io_service& get_service() const; 

private: 
    io_service service_; 
    io_service::work* work_; 
    std::vector<std::thread> threads_; 
}; 

async_service::async_service() 
    : work_(nullptr) 
{ 
} 

async_service::async_service(size_t number_threads) 
    : work_(nullptr) 
{ 
    for (size_t i = 0; i < number_threads; ++i) 
     spawn(); 
} 

async_service::~async_service() 
{ 
    std::cout << __PRETTY_FUNCTION__ << std::endl; 
    service_.stop(); 
    for (std::thread& t: threads_) 
     t.join(); 
} 

void run_service(io_service* service) 
{ 
    service->run(); 
} 

void async_service::spawn() 
{ 
    if (!work_) 
     work_ = new io_service::work(service_); 
    threads_.push_back(std::thread(run_service, &service_)); 
} 
void async_service::shutdown() 
{ 
    delete work_; 
    work_ = nullptr; 
} 

io_service& async_service::get_service() 
{ 
    return service_; 
} 
const io_service& async_service::get_service() const 
{ 
    return service_; 
} 

// -------------------------------------------------------------- 

template <typename... Args> 
class subscriber 
    : public std::enable_shared_from_this<subscriber<Args...>> 
{ 
public: 
    typedef std::function<void (Args...)> handler_type; 
    typedef std::shared_ptr<subscriber<Args...>> ptr; 

    subscriber(async_service& service) 
     : strand_(service.get_service()) 
    { 
    } 

    void subscribe(handler_type handle) 
    { 
     strand_.dispatch(
      std::bind(&subscriber<Args...>::do_subscribe, 
       this->shared_from_this(), handle)); 
    } 

    void relay(Args... params) 
    { 
     strand_.dispatch(
      std::bind(&subscriber<Args...>::do_relay, 
       this->shared_from_this(), std::forward<Args>(params)...)); 
    } 

private: 
    typedef std::stack<handler_type> registry_stack; 

    void do_subscribe(handler_type handle) 
    { 
     registry_.push(handle); 
    } 

    void do_relay(Args... params) 
    { 
     registry_stack notify_copy = std::move(registry_); 
     registry_ = registry_stack(); 
     while (!notify_copy.empty()) 
     { 
      notify_copy.top()(params...); 
      notify_copy.pop(); 
     } 
     assert(notify_copy.empty()); 
    } 

    io_service::strand strand_; 
    registry_stack registry_; 
}; 

// -------------------------------------------------------- 

class lala_channel_proxy 
    : public std::enable_shared_from_this<lala_channel_proxy> 
{ 
public: 
    typedef std::function<void (const std::error_code&)> receive_inventory_handler; 

    lala_channel_proxy(async_service& service) 
     : strand_(service.get_service()) 
    { 
     inventory_subscriber_ = 
      std::make_shared<inventory_subscriber_type>(service); 
    } 

    void start() 
    { 
     read_header(); 
    } 

    void subscribe_inventory(receive_inventory_handler handle_receive) 
    { 
     inventory_subscriber_->subscribe(handle_receive); 
    } 

    typedef subscriber<const std::error_code&> inventory_subscriber_type; 

    void read_header() 
    { 
     strand_.post(
      std::bind(&lala_channel_proxy::handle_read_header, 
       shared_from_this(), boost::system::error_code(), 0)); 
    } 

    void handle_read_header(const boost::system::error_code& ec, 
     size_t bytes_transferred) 
    { 
     std::cout << "inventory ----------" << std::endl; 
     inventory_subscriber_->relay(std::error_code()); 
     sleep(1.0); 
     read_header(); 
    } 

    io_service::strand strand_; 
    inventory_subscriber_type::ptr inventory_subscriber_; 
}; 

typedef std::shared_ptr<lala_channel_proxy> lala_channel_proxy_ptr; 

class lala_channel 
{ 
public: 
    lala_channel(async_service& service) 
    { 
     lala_channel_proxy_ptr proxy = 
      std::make_shared<lala_channel_proxy>(service); 
     proxy->start(); 
     //weak_proxy_ = proxy; 
     strong_proxy_ = proxy; 
    } 
    void subscribe_inventory(
     lala_channel_proxy::receive_inventory_handler handle_receive) 
    { 
     lala_channel_proxy_ptr proxy = strong_proxy_; 
     proxy->subscribe_inventory(handle_receive); 
    } 
    lala_channel_proxy_ptr strong_proxy_; 
    // Normally this has a weak pointer to the channel pimpl to allow 
    // it to die, but whether it uses a weak_ptr or shared_ptr makes 
    // no difference. 
    //std::weak_ptr<channel_proxy> weak_proxy_; 
}; 

typedef std::shared_ptr<lala_channel> lala_channel_ptr; 
//typedef lala_channel_proxy_ptr lala_channel_ptr; 

class session 
    : public std::enable_shared_from_this<session> 
{ 
public: 
    typedef std::function<void (const std::error_code&)> completion_handler; 

    session(async_service& service, async_service& mempool_service, 
      async_service& disk_service) 
     : strand_(service.get_service()), 
     txpool_strand_(mempool_service.get_service()), 
     chain_strand_(disk_service.get_service()), service_(service) 
    { 
    } 

    void start() 
    { 
     auto this_ptr = shared_from_this(); 
     lala_channel_ptr node = 
      std::make_shared<lala_channel>(service_); 
     node->subscribe_inventory(
      std::bind(&session::inventory, this_ptr, _1, node)); 
     for (size_t i = 0; i < 500; ++i) 
     { 
      chain_strand_.post(
       []() 
       { 
        std::cout << "HERE!" << std::endl; 
        sleep(2); 
       }); 
     } 
    } 

private: 
    void inventory(const std::error_code& ec, lala_channel_ptr node) 
    { 
     if (ec) 
     { 
      std::cerr << ec.message() << std::endl; 
      return; 
     } 
     auto this_ptr = shared_from_this(); 
     txpool_strand_.post([]() {}); 
     node->subscribe_inventory(
      std::bind(&session::inventory, this_ptr, _1, node)); 
    } 

    async_service& service_; 
    io_service::strand txpool_strand_, strand_, chain_strand_; 
}; 

int main() 
{ 
    // First level 
    { 
     // Bug only happens for this ordering of async_service's 
     // That means it is only triggered when they are destroyed in 
     // this reverse order. 
     async_service network_service(1), disk_service(1), mempool_service(1); 
     //async_service network_service(1), mempool_service(1), disk_service(1); 
     //async_service disk_service(1), mempool_service(1), network_service(1); 
     //async_service disk_service(1), network_service(1), mempool_service(1); 
     //async_service mempool_service(1), disk_service(1), network_service(1); 
     //async_service mempool_service(1), network_service(1), disk_service(1); 

     // Second level 
     { 
      // Should be kept alive by io_service 
      auto s = std::make_shared<session>(network_service, mempool_service, disk_service); 
      s->start(); 
     } 
     //network_service.shutdown(); 
     //disk_service.shutdown(); 
     //mempool_service.shutdown(); 
     sleep(3); 
    // Never gets past here 
    } 
    std::cout << "Exiting..." << std::endl; 
    return 0; 
} 

當我運行它,我得到這個:

$ g++ -std=c++0x /tmp/ideone_y6OlI.cpp -lboost_system -pthread -ggdb 
$ gdb a.out 
GNU gdb (Ubuntu/Linaro 7.4-2012.04-0ubuntu2) 7.4-2012.04 
Copyright (C) 2012 Free Software Foundation, Inc. 
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html> 
This is free software: you are free to change and redistribute it. 
There is NO WARRANTY, to the extent permitted by law. Type "show copying" 
and "show warranty" for details. 
This GDB was configured as "x86_64-linux-gnu". 
For bug reporting instructions, please see: 
<http://bugs.launchpad.net/gdb-linaro/>... 
Reading symbols from /home/genjix/src/brokenlibbtc/a.out...done. 
(gdb) r 
Starting program: /home/genjix/src/brokenlibbtc/a.out 
[Thread debugging using libthread_db enabled] 
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". 
[New Thread 0x7ffff6deb700 (LWP 28098)] 
[New Thread 0x7ffff65ea700 (LWP 28099)] 
[New Thread 0x7ffff5de9700 (LWP 28100)] 
inventory ---------- 
HERE! 
inventory ---------- 
HERE! 
inventory ---------- 
async_service::~async_service() 
async_service::~async_service() 
[Thread 0x7ffff5de9700 (LWP 28100) exited] 

Program received signal SIGSEGV, Segmentation fault. 
[Switching to Thread 0x7ffff6deb700 (LWP 28098)] 
0x0000000000405873 in boost::asio::detail::task_io_service::wake_one_idle_thread_and_unlock (this=0x6255e0, lock=...) at /usr/include/boost/asio/detail/impl/task_io_service.ipp:461 
461  first_idle_thread_ = idle_thread->next; 

與提升1.48和1.49同樣的事情。

我想知道爲什麼會發生這種情況。它只發生在這個非常特殊的配置中。如果我改變了什麼,那麼錯誤不會發生。

async_service是io_service的便捷包裝。奇怪的是,如果我將io_service更改爲* io_service,並且不刪除io_service,那麼錯誤不會發生......但肯定它不應該有什麼關係?

如果您查看main()中的源代碼,則會創建3個async_service對象。它們中的每一個管理單個io_service的生命週期。

 // Bug only happens for this ordering of async_service's 
     // That means it is only triggered when they are destroyed in 
     // this reverse order. 
     async_service network_service(1), disk_service(1), mempool_service(1); 
     //async_service network_service(1), mempool_service(1), disk_service(1); 
     //async_service disk_service(1), mempool_service(1), network_service(1); 
     //async_service disk_service(1), network_service(1), mempool_service(1); 
     //async_service mempool_service(1), disk_service(1), network_service(1); 
     //async_service mempool_service(1), network_service(1), disk_service(1); 

訂戶類表示訂閱...對特定事件的調用事物。會話和頻道的東西是從一個更大的程序改編的,所以它們可能看起來很糾結/混亂。

+2

它看起來像一個對象生命期問題。可能你的'io_service'被破壞,而未完成的操作仍在等待中。 – Chad

回答

3

的一個問題是session::inventory,第一參數給構造(network_service在衰竭的情況)下從一個線程執行時,嘗試訪問使用第二個參數(mempool_service),該初始化的鏈。

void inventory(const std::error_code& ec, lala_channel_ptr node) 
{ 
    if (ec) 
    { 
     std::cerr << ec.message() << std::endl; 
     return; 
    } 
    auto this_ptr = shared_from_this(); 
    txpool_strand_.post([]() {}); // <-- one problem is here. 
    node->subscribe_inventory(
     std::bind(&session::inventory, this_ptr, _1, node)); 
} 

鑑於破壞的秩序,mempool_service已經被銷燬,並訪問會有後的執行過程中的某個地方失敗。

+0

這是問題所在。謝謝 – genjix

相關問題