2011-10-16 34 views
1

我正在尋找等待多個作業完成的方法,然後執行另一個完全不同數量的作業。當然有線程。簡單的解釋: 我創建了兩個工作線程,都在io_service上執行。下面的代碼取自hereboost asio需要在m個作業完成後發佈n個作業

爲了簡單起見,我創造了兩種類型的工作,CalculateFibCalculateFib2。我想CalculateFib2作業到開始後,只有在CalculateFib作業完成。我嘗試使用條件變量,如here所述,但如果CalculateFib2作業不止一個,程序將掛起。我究竟做錯了什麼?

THX,dodol

#include <boost/asio.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/mutex.hpp> 
#include <boost/bind.hpp> 
#include <iostream> 

boost::mutex global_stream_lock; 
boost::mutex mx; 
boost::condition_variable cv; 

void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service) 
{ 
    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] Thread Start" << std::endl; 
    global_stream_lock.unlock(); 

    io_service->run(); 

    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] Thread Finish" << std::endl; 
    global_stream_lock.unlock(); 
} 

size_t fib(size_t n) 
{ 
    if (n <= 1) 
    { 
     return n; 
    } 
    boost::this_thread::sleep( 
     boost::posix_time::milliseconds(1000) 
     ); 
    return fib(n - 1) + fib(n - 2); 
} 

void CalculateFib(size_t n) 
{ 
    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] Now calculating fib(" << n << ") " << std::endl; 
    global_stream_lock.unlock(); 

    size_t f = fib(n); 

    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] fib(" << n << ") = " << f << std::endl; 
    global_stream_lock.unlock(); 

    boost::lock_guard<boost::mutex> lk(mx); 
    cv.notify_all(); 
} 

void CalculateFib2(size_t n) 
{ 
    boost::unique_lock<boost::mutex> lk(mx); 
    cv.wait(lk); 

    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] Now calculating fib2(" << n << ") " << std::endl; 
    global_stream_lock.unlock(); 

    size_t f = fib(n); 

    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] fib2(" << n << ") = " << f << std::endl; 
    global_stream_lock.unlock(); 
} 
int main(int argc, char * argv[]) 
{ 
    boost::shared_ptr<boost::asio::io_service> io_service(
     new boost::asio::io_service 
     ); 
    boost::shared_ptr<boost::asio::io_service::work> work(
     new boost::asio::io_service::work(*io_service) 
     ); 

    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
     << "] The program will exit when all work has finished." 
     << std::endl; 
    global_stream_lock.unlock(); 

    boost::thread_group worker_threads; 
    for(int x = 0; x < 2; ++x) 
    { 
     worker_threads.create_thread( 
      boost::bind(&WorkerThread, io_service) 
      ); 
    } 
    io_service->post(boost::bind(CalculateFib, 5)); 
    io_service->post(boost::bind(CalculateFib, 4)); 
    io_service->post(boost::bind(CalculateFib, 3)); 

    io_service->post(boost::bind(CalculateFib2, 1)); 
    io_service->post(boost::bind(CalculateFib2, 1)); 
    work.reset(); 
    worker_threads.join_all(); 

    return 0; 
} 

回答

2

裏面CalculateFib2你要做的第一件事就是等待條件cv)。 只有這個條件在CalculateFib的末尾得到信號。所以,理由是執行永不繼續,除非條件觸發(通過發佈CalculateFib)作業。

事實上,加入任何其他行,像這樣:

io_service->post(boost::bind(CalculateFib, 5)); 
io_service->post(boost::bind(CalculateFib, 4)); 
io_service->post(boost::bind(CalculateFib, 3)); 

io_service->post(boost::bind(CalculateFib2, 1)); 
io_service->post(boost::bind(CalculateFib2, 1)); 

io_service->post(boost::bind(CalculateFib, 5)); // <-- ADDED 

使得執行運行完。

在努力擺脫更多的光線:如果您隔離FIB2批次(時間),如

io_service->post(boost::bind(CalculateFib, 5)); 
io_service->post(boost::bind(CalculateFib, 4)); 
io_service->post(boost::bind(CalculateFib, 3)); 

boost::this_thread::sleep(boost::posix_time::seconds(10)); 
io_service->post(boost::bind(CalculateFib2, 1)); 
io_service->post(boost::bind(CalculateFib2, 1)); 
io_service->post(boost::bind(CalculateFib2, 1)); 
io_service->post(boost::bind(CalculateFib2, 1)); 
io_service->post(boost::bind(CalculateFib2, 1)); 
io_service->post(boost::bind(CalculateFib2, 1)); 

所有FIB2作業將一直阻塞,無論線程數,因爲一個Fib工作過所有人在發佈之前退出。一個簡單的

io_service->post(boost::bind(CalculateFib, 1)); 

將解鎖所有的服務員(即只多達有等待的線程,這是可用線程-1的數量,因爲纖維蛋白原()的工作佔據一個線程爲好。現在用< 7線程這會死鎖,因爲沒有可供甚至開始一個Fib()工作(所有線程被阻塞等待FIB2)


說實話,我不明白你是什麼的線程試圖通過調度來實現,我懷疑你應該監視工作隊列並明確地發佈工作('任務')只有當你達到所需數量的項目。這樣你就可以KISS並獲得一個非常靈活的界面來安排你的工作。

通常,使用線程組(pooling),您希望避免無限期地阻塞線程。這有可能導致您的工作安排陷入僵局,並且執行效果不佳。

+0

是的,我只想在其他一些工作完成時發佈工作,因爲他們依賴於第一組工作創建的結果。我不知道該怎麼做,也就是說,我怎麼知道工作組何時完成?順便說一句,thx爲上述主題的透徹解釋。 – dodol

+0

@dodol:你描述的意圖仍然意味着許多不同的東西。但是,我會考慮簡單地在'依賴作業'中發佈後續作業(因此您可以將作業線程函數作爲最初作業的參數 - 這非常類似於_ [continuation passing style](http: //en.wikipedia.org/wiki/Continuation-passing_style)_) – sehe

+0

我希望能夠在多個線程上發佈一組作業,然後在作業結束時,另一組作業也在多個作業上線程。這個循環多次循環,所以我認爲最好的辦法是在循環開始時創建io_service並創建線程,所以我不會在每次循環開始/結束時創建&銷燬線程。問題是我不知道我應該在什麼時候開始第二組工作,因爲我無法弄清楚第一個工作什麼時候完成。有什麼想法嗎? – dodol