2011-08-28 39 views
1

一般而言,我已經看到它是common way to create thread pools via "io_service + thread_group"。它非常適合const大小的線程池。或者只能變得更大的泳池。但我不知道如何使這樣的游泳池變小,停止所有io_service?如何使boost :: thread_group變小boost :: asio :: io_service ::在其線程中運行?

因此,我們必須爲shown

// class variables 
asio::io_service io_service; 
boost::thread_group threads; 
asio::io_service::work *work; 

// some pool init function 
work = new asio::io_service::work(io_service); 
int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

// and now we can simply post tasks 
io_service.post(boost::bind(&class_name::an_expensive_calculation, this, 42)); 
io_service.post(boost::bind(&class_name::a_long_running_task, this, 123)); 

// and it is really eazy to make pool biger - just call (mutexes may be required) 
threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

但是,如果我們想從我們的線程池中刪除線程是什麼?我們不能只是簡單地致電threads.remove_thread(thread* thrd);,因爲它不會停止運行它(IMHO),所以我想知道 - 是否有可能以及如何從這樣的池中真正刪除線程? (不只是打斷tham,而是等到當前線程任務超出範圍)?

更新:

下面是一些簡單的編譯代碼:線程池,線程需要的續航時間。

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

//Boost 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/locks.hpp> 

boost::asio::io_service io_service; 
boost::asio::io_service::work *work; 
boost::thread_group threads; 
boost::mutex threads_creation; 
int time_limit; 

int calculate_the_answer_to_life_the_universe_and_everything(int i) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(i)); 
    std::cout << i << std::endl; 
    return i; 
} 

void run(boost::shared_ptr<boost::thread> thread_ptr) 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception &e) 
    { 
     std::cout << "exeption: " << e.what() << std::endl; 
     boost::mutex::scoped_lock lock(threads_creation); 
     threads.remove_thread(thread_ptr.get()); 
     lock.unlock(); 
     std::cout << "thread removed from group" << std::endl; 
     return; 
    } 

} 

void pool_item(int i) 
{ 
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i)); 
    boost::unique_future<int> fi=pt.get_future(); 

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread 

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit))) 
    { 
     std::cout << "sucsess function returned: " << fi.get() << std::endl; 
    } 
    else 
    { 
     std::cout << "request took way 2 long!" << std::endl; 

     std::cout << "current group size:" << threads.size() << std::endl; 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 

     boost::mutex::scoped_lock lock(threads_creation); 
     threads.add_thread(thread.get()); 
     lock.unlock(); 

     task->join(); 

     throw std::runtime_error("killed joined thread"); 

    } 
} 

int main() 
{ 
    time_limit = 500; 

    work = new boost::asio::io_service::work(io_service); 
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    { 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 
     threads.add_thread(thread.get()); 
    } 

    int i = 800; 
    io_service.post(boost::bind(pool_item, i)); 

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2)); 
    std::cout << "thread should be removed by now." << std::endl 
     << "group size:" << threads.size() << std::endl; 

    std::cin.get(); 
    return 0; 
} 

正如你所看到的線程不從線程刪除即使.remove_thread(ptr);調用池=(爲什麼

更新#2:?

那麼什麼方法,我結束了服裝線組...

#include <stdio.h> 
#include <iostream> 
#include <fstream> 
#include <set> 

//Boost 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/locks.hpp> 

//cf service interface 
//#include <service.hpp> 

//cf-server 
//#include <server.h> 

#include <boost/foreach.hpp> 

class thread_group 
{ 
public: 
    void add(boost::shared_ptr<boost::thread> to_add) 
    { 
     boost::mutex::scoped_lock lock(m); 
     ds_.insert(to_add); 
    } 
    void remove(boost::shared_ptr<boost::thread> to_remove) 
    { 
     boost::mutex::scoped_lock lock(m); 
     ds_.erase(to_remove); 
    } 

    int size() 
    { 
     boost::mutex::scoped_lock lock(m); 
     return ds_.size(); 
    } 

    void join_all(boost::posix_time::milliseconds interuption_time=boost::posix_time::milliseconds(1000)) 
    { 
     boost::mutex::scoped_lock lock(m); 
     BOOST_FOREACH(boost::shared_ptr<boost::thread> t, ds_) 
     { 
      boost::thread interrupter(boost::bind(&thread_group::interupt_thread, this, t, interuption_time)); 
     } 
    } 

private: 
    std::set< boost::shared_ptr<boost::thread> > ds_; 
    boost::mutex m; 
    void interupt_thread(boost::shared_ptr<boost::thread> t, boost::posix_time::milliseconds interuption_time) 
    { 
     try 
     { 
      if(!t->timed_join(interuption_time)) 
       t->interrupt(); 

     } 
     catch(std::exception &e) 
     { 
     } 
    } 
}; 

boost::asio::io_service io_service; 
boost::asio::io_service::work *work; 
thread_group threads; 
int time_limit; 



int calculate_the_answer_to_life_the_universe_and_everything(int i) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(i)); 
    std::cout << i << std::endl; 
    return i; 
} 

void run(boost::shared_ptr<boost::thread> thread_ptr) 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception &e) 
    { 
     std::cout << "exeption: " << e.what() << std::endl; 
     threads.remove(thread_ptr); 
     std::cout << "thread removed from group" << std::endl; 
     return; 
    } 

} 

void pool_item(int i) 
{ 
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i)); 
    boost::unique_future<int> fi=pt.get_future(); 

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread 

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit))) 
    { 
     std::cout << "sucsess function returned: " << fi.get() << std::endl; 
    } 
    else 
    { 
     std::cout << "request took way 2 long!" << std::endl; 

     std::cout << "current group size:" << threads.size() << std::endl; 
     std::cout << "we want to add thread!" << std::endl; 
     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     threads.add(thread); 
     std::cout << "thread added" << std::endl 
      << "current group size:" << threads.size() << std::endl; 
     task->join(); 

     throw std::runtime_error("killed joined thread"); 

    } 
} 

int main() 
{ 
    time_limit = 500; 

    work = new boost::asio::io_service::work(io_service); 
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    { 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 
     threads.add(thread); 
    } 

    int i = 800; 
    io_service.post(boost::bind(pool_item, i)); 

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2)); 
    std::cout << "thread should be removed by now." << std::endl 
     << "group size:" << threads.size() << std::endl; 

    std::cin.get(); 
    return 0; 
} 

回答

3

我已經能夠做到這一點在過去通過利用run()會在回調拋出異常時退出的事實。而不是直接在線程啓動run()的,我稱之爲退出,如果適當的異常被拋出線程一個效用函數:

void RunIOService() 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception ex) 
    { 
    } 
} 

然後,所有你需要做的就是安排一個回調,這將拋出一個異常:

static void KillThreadCallback() 
{ 
    // throw some exception that you catch above 
} 

io_service.post(&KillThreadCallback); 

這將導致執行此回調退出,由1實質上降低線程池計數的大小使用此線程,你可以展開和很容易地收縮io_service線程池。其可用於乾淨地關閉I/O服務時(使用的C++ 0x lambda表達式)

+0

請參閱我的帖子更新。 – Rella

1

一種模式:

void ThreadLoop() 
{ 
    while(m_keepRunning) { 
     try { 
      io_service.run_one(); 
     } catch(const std::exception& e) { 
      // error handling 
     } 
    } 
} 

void Stop() 
{ 
    // Using C++0x lambdas 
    io_service.post([=]{ m_keepRunning = false; }); 
    // or 
    io_service.post(boost::bind(&ThisClass::StopCallback, this)); 
} 

void StopCallback() 
{ 
    m_keepRunning = false; 
} 

m_keepRunning是一個成員變量。只應在I/O服務線程中觸及。

相關問題