2014-02-07 44 views
3

我已經實現了How to create a thread pool using boost in C++?的解決方案,但是我的線程在io_service :: stop()函數停止進一步處理時遇到問題。在C++中使用Boost的線程池無法正常工作

在我的情況下,我在我的池中有3個線程,並試圖通過它運行11000條記錄。每個記錄都獨立於其他記錄,所以我只是想通過創建每個記錄的並行運行來加速處理。

void processRecord (unsigned int i, unsigned int numRecords) 
{ 
    cout << i << "/" << numRecords << endl; 

    // do Processing... 
} 

#define MAX_THREADS 3 
unsigned int numRecords=11000 

boost::asio::io_service ioService; 
boost::thread_group threadPool; 

boost::asio::io_service::work work (ioService); 

for (unsigned int i=0 ; i<MAX_THREADS ; ++i) 
{ 
    threadPool.create_thread (boost::bind (&boost::asio::io_service::run, &ioService)); 
} 

for (unsigned int i=0 ; i<numRecords ; ++i) 
{ 
    ioService.post (boost::bind (processRecord, i, numRecords); 
} 

// ioService.stop();   // Was causing ioService to stop 
work.reset();     // Wait for all work to be finished. 
threadPool.join_all(); 

processAllRecords(); 

我看到的問題是,當調用ioService.post()是推動流程入池完成循環後,它擊中ioService.stop()調用並停止所有進一步的處理。這通常發生在實際處理了大約400條記錄之後。

因此,只有約400個〜11000條記錄正在處理中。我很新來使用C++中的線程,所以我不知道我缺少什麼或如何糾正這個問題。任何幫助將不勝感激。

編輯:我修改了上面的代碼,以反映我爲使其工作所做的更改。實質上,ioService.stop()調用導致所有進一步處理停止。我用work.wait()替換了它,以便它等到所有工作完成。

編輯2:我在以前的編輯中使用了錯誤的功能。它應該是work.reset()。

+2

在完成之前不要致電停止嗎? – sehe

+1

我剛剛發佈,我發現解決方案在這裏:http://www.tonicebrian.com/2012/05/23/thread-pool-in-c/。而不是調用ioService.stop()我應該調用work.reset()。這將等待所有的「工作」完成,然後我可以執行join_all()。這解決了這個問題,現在我有另一個追蹤這是不相關的。 –

+1

這將是最有用的,如果你發佈你的答案讓其他人找到它(和upvote!) – sehe

回答

0

使用你的代碼,我爲線程組使用boost :: asio的方式, 將括號括起來並使用scoped_ptr。只是一個想法。

void processRecord (unsigned int i, unsigned int numRecords) 
{ 
    cout << i << "/" << numRecords << endl; 

    // do Processing... 
} 

#define MAX_THREADS 3 
unsigned int numRecords=11000 

boost::asio::io_service ioService; 
boost::thread_group threadPool; 

// by using a scoped pointer for the io_service::work 
// and enclosing the threading in brackets 
// this should run until all the jobs have finished 
// and you don't need to call work.reset() 

// added brackets around threading 
{ 
    // made work a boost::scoped_ptr 
    boost::scoped_ptr<boost::asio::io_service::work> 
      work (new boost::asio::io_service(ioService)); 

    for (unsigned int i=0 ; i<MAX_THREADS ; ++i) 
    { 
     threadPool.create_thread (
      boost::bind (&boost::asio::io_service::run, &ioService)); 
    } 

    for (unsigned int i=0 ; i<numRecords ; ++i) 
    { 
     ioService.post (boost::bind (processRecord, i, numRecords); 
    } 
} 
// now just have to join 
threadPool.join_all(); 

processAllRecords();