2012-06-12 103 views
4

我想用boost::asio來設置一個線程池。 我的問題是:如何將特定數據附加到創建的每個線程,以及如何管理單個輸出?使用線程池進行仿真:boost-thread和boost-asio

更具體地說,我寫了一個類Simulation,它通過輸入一些參數的方法來執行模擬。 該類包含計算所需的全部數據。 由於數據不是太大,我想複製它以便在池的每個線程中使用類Simulation的不同實例。

我願做這樣的事情: (建立一個線程池在此說明:SOAsio recipes

class ParallelSimulation 
{ 
    public: 
    static const std::size_t N = 10; 

    protected: 
    std::vector< boost::shared_ptr<Simulation> > simuInst; // N copy of a reference instance. 

    public: 

    ... 

    // Simulation with a large (>>N) number of inputs 
    void eval(std::vector<SimulationInput> inputs) 
    { 
     // Creation of the pool using N threads 
     asio::io_service io_service; 
     asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < N; ++i) 
     threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

     // Here ? Attaching the duplicates instances of class Simulation ? 

     // Adding tasks 
     for(std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i) 
     io_service.post(...); // add simulation with inputs[i] to the queue 

     // How to deal with outputs ? 

     // End of the tasks 
     io_service.stop(); 
     threads.join_all(); 
    } 
}; 

也許用於建立一個線程池(使用boost::asio),該技術是不適應我的問題。你有什麼建議嗎? 謝謝。

回答

1

這裏是我的研究成果!

分佈式仿真基於主類DistributedSimulation使用兩個實現類:impl::m_io_serviceimpl::dispatcher

boost::asio線程池基於將io_service::run()方法附加到不同的線程。
這個想法是重新定義這個方法,幷包含一個機制來識別當前線程。以下解決方案基於boost::uuid的線程本地存儲boost::thread_specific_ptr閱讀特雷斯的評論後,我認爲使用boost::thread::id識別線程是一個更好的解決方案(但相當於並不太不同)。
最後,另一個類用於將輸入數據分派給類Simulation的實例。這個類創建了幾個相同類Simulation的實例,並使用它們來計算每個線程的結果。下面

namespace impl { 

    // Create a derived class of io_service including thread specific data (a unique identifier of the thread) 
    struct m_io_service : public boost::asio::io_service 
    { 
    static boost::thread_specific_ptr<boost::uuids::uuid> ptrSpec_; 

    std::size_t run() 
    { 
     if(ptrSpec_.get() == 0) 
     ptrSpec_.reset(new boost::uuids::uuid(boost::uuids::random_generator()()) ); 

     return boost::asio::io_service::run(); 
    } 
    }; 


    // Create a class that dispatches the input data over the N instances of the class Simulation 
    template <class Simulation> 
    class dispatcher 
    { 
    public: 
     static const std::size_t N = 6; 

     typedef Simulation::input_t input_t; 
     typedef Simulation::output_t output_t; 

     friend DistributedSimulation; 

    protected: 
     std::vector< boost::shared_ptr<Simulation> > simuInst; 
     std::vector<boost::uuids::uuid>   map; 

    public: 

     // Constructor, creating the N instances of class Simulation 
     dispatcher(const Simulation& simuRef) 
     { 
     simuInst.resize(N); 
     for(std::size_t i=0; i<N; ++i) 
      simuInst[i].reset(simuRef.clone()); 
     } 

     // Record the unique identifiers and do the calculation using the right instance of class Simulation 
     void dispatch(const Simulation::input_t& in ) 
     { 
     if(map.size() == 0) { 
      map.push_back(*m_io_service::ptrSpec_); 
      simuInst[0]->eval(in, *m_io_service::ptrSpec_); 
     }  
     else { 
      if(map.size() < N) { 
      map.push_back(*m_io_service::ptrSpec_); 
      simuInst[map.size()-1]->eval(in, *m_io_service::ptrSpec_); 
      } 
      else { 
      for(size_t i=0; i<N;++i) { 
       if(map[i] == *m_io_service::ptrSpec_) { 
       simuInst[i]->eval(in, *m_io_service::ptrSpec_); 
       return; 
       } 
      } 
      } 
     } 
     } 
    }; 

    boost::thread_specific_ptr<boost::uuids::uuid> m_io_service::ptrSpec_; 
} 


    // Main class, create a distributed simulation based on a class Simulation 
    template <class Simulation> 
    class DistributedSimulation 
    { 
    public: 
    static const std::size_t N = impl::dispatcher::N; 

    protected: 
    impl::dispatcher _disp; 

    public: 
    DistributedSimulation() : _disp(Simulation()) {} 

    DistributedSimulation(Simulation& simuRef) 
    : _disp(simuRef) { } 


    // Simulation with a large (>>N) number of inputs 
    void eval(const std::vector<Simulation::input_t>& inputs, std::vector<Simulation::output_t>& outputs) 
    { 

     // Clear the results from a previous calculation (and stored in instances of class Simulation) 
     ... 

     // Creation of the pool using N threads 
     impl::m_io_service io_service; 
     boost::asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < N; ++i) 
     threads.create_thread(boost::bind(&impl::m_io_service::run, &io_service)); 

     // Adding tasks 
     for(std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i) 
     io_service.post(boost::bind(&impl::dispatcher::dispatch, &_disp, inputs[i])); 

     // End of the tasks 
     io_service.stop(); 
     threads.join_all(); 

     // Gather the results iterating through instances of class simulation 
     ... 
    } 
    }; 

編輯

的代碼是我以前的解決方案的更新,同時考慮到特雷斯的評論。正如我之前所說,它更簡單易讀!

template <class Simulation> 
    class DistributedSimulation 
    { 
    public: 
     typedef typename Simulation::input_t input_t; 
     typedef typename Simulation::output_t output_t; 

     typedef boost::shared_ptr<Simulation> SimulationSPtr_t; 
     typedef boost::thread::id    id_t;  
     typedef std::map< id_t, std::size_t >::iterator IDMapIterator_t; 

    protected: 
     unsigned int     _NThreads; // Number of threads 
     std::vector<SimulationSPtr_t> _simuInst; // Instances of class Simulation 
     std::map< id_t, std::size_t > _IDMap;  // Map between thread id and instance index. 

    private: 
     boost::mutex _mutex; 

    public: 

     DistributedSimulation() {} 

     DistributedSimulation(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency()) 
     { init(simuRef, NThreads); } 

     DistributedSimulation(const DistributedSimulation& simuDistrib) 
     { init(simuRef, NThreads); } 

     virtual ~DistributedSimulation() {} 

     void init(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency()) 
     { 
     _NThreads = (NThreads == 0) ? 1 : NThreads; 
     _simuInst.resize(_NThreads); 
     for(std::size_t i=0; i<_NThreads; ++i) 
      _simuInst[i].reset(simuRef.clone()); 
     _IDMap.clear(); 
     } 


     void dispatch(const input_t& input) 
     { 
     // Get current thread id 
     boost::thread::id id0 = boost::this_thread::get_id(); 

     // Get the right instance 
     Simulation* sim = NULL;   
     { 
      boost::mutex::scoped_lock scoped_lock(_mutex); 
      IDMapIterator_t it = _IDMap.find(id0); 
      if(it != _IDMap.end()) 
      sim = _simuInst[it->second].get(); 
     } 

     // Simulation 
     if(NULL != sim) 
      sim->eval(input); 
     } 


     // Distributed evaluation. 
     void eval(const std::vector<input_t>& inputs, std::vector<output_t>& outputs) 
     { 
     //--Initialisation 
     const std::size_t NInputs = inputs.size(); 

     // Clear the ouptuts f(contained in instances of class Simulation) from a previous run 
     ... 

     // Create thread pool and save ids 
     boost::asio::io_service io_service; 
     boost::asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < _NThreads; ++i) 
     { 
      boost::thread* thread_ptr = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service)); 
      _IDMap[ thread_ptr->get_id() ] = i; 
     } 

     // Add tasks 
     for(std::size_t i = 0; i < NInputs; ++i) 
      io_service.post(boost::bind(&DistributedSimulation::dispatch, this, inputs[i])); 

     // Stop the service 
     io_service.stop(); 
     threads.join_all(); 

     // Gather results (contained in each instances of class Simulation) 
     ... 
     } 
    }; 
0

這應該適用於您的應用程序。當您撥打io_service.post時,您將以inputs[i]作爲參數傳入模擬功能。在該函數中(大概是Simulation的成員函數),只需將計算結果存儲在Simulation對象中,然後在加入線程以收集輸出後迭代對象。

如果您需要確定執行工作的特定線程,您也可以將i作爲參數。這假設在模擬完成後收集輸出是可以的。

如果您在運行時需要訪問輸出,只需根據需要將函數post的輸出任務改爲io_service即可。確保使用互斥體保護任何共享數據結構!

+0

謝謝你的回答。 但是我不知道如何使用類Simulation的N個實例來確保線程'i'用實例'i'計算結果(因此使用它自己的數據來完成工作)。 處理輸出的方式似乎很好,謝謝! –

+0

@ gleeen.gould你能說清楚爲什麼使用線程'i'來計算模擬'i'的工作嗎?這是可能的(當你創建它時調用'thread-> get_id()',把它傳遞給'simulation [i]',然後讓'simulation [i]'的函數檢查它的執行時間,'boost :: this_thread :: get_id()'匹配,如果沒有返回,否則繼續),但我不認爲這是必要的。 – Tres

+0

看看我的解決方案,我希望這會澄清我的目標。 –