2015-11-11 36 views
3

在C++ 11,I具有其管理一個數字,經由單一lambda函數排隊的線程的線程池對象。我知道我需要處理多少行數據,所以我提前知道我需要對N個作業進行排隊。我不確定的是如何判斷所有這些工作何時完成,因此我可以繼續下一步。如何判斷我的ThreadPool何時完成其任務?

這是管理的線程池代碼:

#include <cstdlib> 
#include <vector> 
#include <deque> 
#include <iostream> 
#include <atomic> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 

class ThreadPool; 

class Worker { 
public: 
    Worker(ThreadPool &s) : pool(s) { } 
    void operator()(); 
private: 
    ThreadPool &pool; 
}; 

class ThreadPool { 
public: 
    ThreadPool(size_t); 
    template<class F> 
    void enqueue(F f); 
    ~ThreadPool(); 
    void joinAll(); 
    int taskSize(); 

private: 
    friend class Worker; 

    // the task queue 
    std::deque< std::function<void()> > tasks; 

    // keep track of threads 
    std::vector<std::thread> workers; 

    // sync 
    std::mutex queue_mutex; 
    std::condition_variable condition; 
    bool stop; 
}; 

void Worker::operator()() 
{ 
    std::function<void()> task; 
    while(true) 
    { 
     { // acquire lock 
      std::unique_lock<std::mutex> 
       lock(pool.queue_mutex); 

      // look for a work item 
      while (!pool.stop && pool.tasks.empty()) { 
       // if there are none wait for notification 
       pool.condition.wait(lock); 
      } 

      if (pool.stop) {// exit if the pool is stopped 
       return; 
      } 

      // get the task from the queue 
      task = pool.tasks.front(); 
      pool.tasks.pop_front(); 

     } // release lock 

     // execute the task 
     task(); 
    } 
} 


// the constructor just launches some amount of workers 
ThreadPool::ThreadPool(size_t threads) 
    : stop(false) 
{ 
    for (size_t i = 0;i<threads;++i) { 
     workers.push_back(std::thread(Worker(*this))); 
    } 

    //workers. 
    //tasks. 
} 

// the destructor joins all threads 
ThreadPool::~ThreadPool() 
{ 
    // stop all threads 
    stop = true; 
    condition.notify_all(); 

    // join them 
    for (size_t i = 0;i<workers.size();++i) { 
     workers[i].join(); 
    } 
} 

void ThreadPool::joinAll() { 
    // join them 
    for (size_t i = 0;i<workers.size();++i) { 
     workers[i].join(); 
    } 
} 

int ThreadPool::taskSize() { 
    return tasks.size(); 
} 

// add new work item to the pool 
template<class F> 
void ThreadPool::enqueue(F f) 
{ 
    { // acquire lock 
     std::unique_lock<std::mutex> lock(queue_mutex); 

     // add the task 
     tasks.push_back(std::function<void()>(f)); 
    } // release lock 

    // wake up one thread 
    condition.notify_one(); 
} 

然後分發我的工作線程之間是這樣的:

ThreadPool pool(4); 
/* ... */ 
for (int y=0;y<N;y++) { 
    pool->enqueue([this,y] { 
     this->ProcessRow(y); 
    }); 
} 

// wait until all threads are finished 
std::this_thread::sleep_for(std::chrono::milliseconds(100)); 

等待100毫秒的作品,只是因爲我知道這些工作可以完成時間少於100毫秒,但顯然它不是最好的方法。一旦完成了N行處理,它需要經歷另外1000代左右的相同事情。顯然,我想盡快開始下一代。

我知道一定有某種方式將代碼添加到我的線程池,這樣我可以做這樣的事情:

while (pool->isBusy()) { 
    std::this_thread::sleep_for(std::chrono::milliseconds(1)); 
} 

我一直在做這個了幾晚,現在我覺得很難找到如何做到這一點的好例子。 那麼,這將是對我是否履行isBusy()方法的正確方法?

+1

使用條件變量和標誌。等待條件變量並在其謂詞中測試該標誌。 –

+0

我想問一下,如果你考慮使用像英特爾的Threading Buildings Blocks這樣的東西?也許在BOOST中有很多有用的東西,而且微軟也有自己的庫。創建自己的線程池通常是最後的手段,以防萬一你真的需要現有的線程池不能提供的東西,像英特爾的TBB這樣的庫也不會那樣做。但在這裏,我可以想象,這些任務將安排在TBB上,然後等待... – ipavlu

+0

@ipavlu,它可能使用C++ 11編寫多線程代碼而無需額外的庫。我試圖避免額外的大量依賴。我解決了這個問題,並提供了我自己的答案。 – Octopus

回答

1

您可能需要創建具有布爾變量標誌相關聯的線程的內部結構。

class ThreadPool { 
private: 
    // This Structure Will Keep Track Of Each Thread's Progress 
    struct ThreadInfo { 
     std::thread thread; 
     bool  isDone; 

     ThreadInfo(std::thread& threadIn) : 
      thread(threadIn), isDone(false) 
     {} 
    }; // ThredInfo 

    // This Vector Should Be Populated In The Constructor Initially And 
    // Updated Anytime You Would Add A New Task. 
    // This Should Also Replace // std::vector<std::thread> workers 
    std::vector<ThreadInfo> workers; 

public: 
    // The rest of your class would appear to be the same, but you need a 
    // way to test if a particular thread is currently active. When the 
    // thread is done this bool flag would report as being true; 

    // This will only return or report if a particular thread is done or not 
    // You would have to set this variable's flag for a particular thread to 
    // true when it completes its task, otherwise it will always be false 
    // from moment of creation. I did not add in any bounds checking to keep 
    // it simple which should be taken into consideration. 
    bool isBusy(unsigned idx) const { 
     return workers[idx].isDone; 
    } 
}; 
+0

如果將有比線程更多的作業並且線程池必須通過接收到的作業慢慢地燒掉會怎麼樣? – ipavlu

+0

@ipavlu我明白你在說什麼;但我正在回答所問的問題。用戶問什麼是我實施他們的'isBusy()'方法的正確方法。用戶並沒有問什麼是最好的解決方案。但是,由於這可能比其他方法更昂貴,因此它將bool標誌與特定線程相關聯。這僅表示線程是否處於活動狀態。如果每個布爾旗都是真的,那麼他們可以繼續下一步。 –

0

如果你有N個就業機會,他們必須通過調用線程睡眠被期待已久的,那麼最有效的方法是創建某處一個變量,會被一個原子操作調度作業前設置爲N並且在完成計算後的每個作業中,都會有變量的原子減量。然後你可以使用原子指令來測試變量是否爲零。

或鎖定減量與等待句柄,當變量將遞減至零。

我必須說,我不喜歡這個想法,你所要求的:

while (pool->isBusy()) { 
    std::this_thread::sleep_for(std::chrono::milliseconds(1)); 
} 

它只是不合身,也不會是1ms的幾乎沒有,它是利用資源等不必要的...

最好的方法是原子方式減小一些變量,並測試原子變量,如果全部完成,最後的工作會簡單地基於原子測試設置WaitForSingleObject的。 以及是否必須,等待的將是WaitForSingleObject的,並會結束後醒來,次數並不多。

WaitForSingleObject

2

我知道了!

首先,我介紹了一些額外的成員ThreadPool類:

class ThreadPool { 
    /* ... exisitng code ... */ 
    /* plus the following */ 
    std::atomic<int> njobs_pending; 
    std::mutex main_mutex; 
    std::condition_variable main_condition; 
} 

現在,我可以做的比檢查一些狀態的時間每X量更好。現在,我可以阻止主循環,直到沒有更多的就業機會正在等待:

void ThreadPool::waitUntilCompleted(unsigned n) { 
    std::unique_lock<std::mutex> lock(main_mutex); 
    main_condition.wait(lock); 
} 

只要我管理什麼的等待與下面的代碼簿記,在ThreadPool.enqueue()函數的開始:

njobs_pending++; 

,並在之後我運行在工任務::運算符()()函數:

if (--pool.njobs_pending == 0) { 
    pool.main_condition.notify_one(); 
} 

然後主線程可以排隊任何任務是必要的,然後坐下來,等到所有的C通過以下方式完成:

for (int y=0;y<N;y++) { 
    pool->enqueue([this,y] { 
     this->ProcessRow(y); 
    }); 
} 
pool->waitUntilCompleted(); 
相關問題