在C++ 11,I具有其管理一個數字,經由單一lambda函數排隊的線程的線程池對象。我知道我需要處理多少行數據,所以我提前知道我需要對N個作業進行排隊。我不確定的是如何判斷所有這些工作何時完成,因此我可以繼續下一步。如何判斷我的ThreadPool何時完成其任務?
這是管理的線程池代碼:
#include <cstdlib>
#include <vector>
#include <deque>
#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
class ThreadPool;
class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
};
class ThreadPool {
public:
ThreadPool(size_t);
template<class F>
void enqueue(F f);
~ThreadPool();
void joinAll();
int taskSize();
private:
friend class Worker;
// the task queue
std::deque< std::function<void()> > tasks;
// keep track of threads
std::vector<std::thread> workers;
// sync
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
void Worker::operator()()
{
std::function<void()> task;
while(true)
{
{ // acquire lock
std::unique_lock<std::mutex>
lock(pool.queue_mutex);
// look for a work item
while (!pool.stop && pool.tasks.empty()) {
// if there are none wait for notification
pool.condition.wait(lock);
}
if (pool.stop) {// exit if the pool is stopped
return;
}
// get the task from the queue
task = pool.tasks.front();
pool.tasks.pop_front();
} // release lock
// execute the task
task();
}
}
// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0;i<threads;++i) {
workers.push_back(std::thread(Worker(*this)));
}
//workers.
//tasks.
}
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
// stop all threads
stop = true;
condition.notify_all();
// join them
for (size_t i = 0;i<workers.size();++i) {
workers[i].join();
}
}
void ThreadPool::joinAll() {
// join them
for (size_t i = 0;i<workers.size();++i) {
workers[i].join();
}
}
int ThreadPool::taskSize() {
return tasks.size();
}
// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
{ // acquire lock
std::unique_lock<std::mutex> lock(queue_mutex);
// add the task
tasks.push_back(std::function<void()>(f));
} // release lock
// wake up one thread
condition.notify_one();
}
然後分發我的工作線程之間是這樣的:
ThreadPool pool(4);
/* ... */
for (int y=0;y<N;y++) {
pool->enqueue([this,y] {
this->ProcessRow(y);
});
}
// wait until all threads are finished
std::this_thread::sleep_for(std::chrono::milliseconds(100));
等待100毫秒的作品,只是因爲我知道這些工作可以完成時間少於100毫秒,但顯然它不是最好的方法。一旦完成了N行處理,它需要經歷另外1000代左右的相同事情。顯然,我想盡快開始下一代。
我知道一定有某種方式將代碼添加到我的線程池,這樣我可以做這樣的事情:
while (pool->isBusy()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
我一直在做這個了幾晚,現在我覺得很難找到如何做到這一點的好例子。 那麼,這將是對我是否履行isBusy()方法的正確方法?
使用條件變量和標誌。等待條件變量並在其謂詞中測試該標誌。 –
我想問一下,如果你考慮使用像英特爾的Threading Buildings Blocks這樣的東西?也許在BOOST中有很多有用的東西,而且微軟也有自己的庫。創建自己的線程池通常是最後的手段,以防萬一你真的需要現有的線程池不能提供的東西,像英特爾的TBB這樣的庫也不會那樣做。但在這裏,我可以想象,這些任務將安排在TBB上,然後等待... – ipavlu
@ipavlu,它可能使用C++ 11編寫多線程代碼而無需額外的庫。我試圖避免額外的大量依賴。我解決了這個問題,並提供了我自己的答案。 – Octopus