2017-02-16 18 views
0

我正在C++中進行多線程。這可能是非常標準的東西,但我似乎無法在任何地方找到它或知道任何關鍵術語來在線搜索它。如何搜索下一個可用的線程來計算

我想多次執行某種計算,但有多個線程。對於每次迭代的計算,我想找到下一個可用線程完成其前一次計算以進行下一次迭代。我不想按順序循環線程,因爲下一個被調用的線程可能還沒有完成它的工作。

E.g. 假設我有一個int向量,並且我想總計有5個線程的總數。我有要更新的總額存儲在某個地方和我目前正在使用的元素的數量。每個線程都會查看計數以查看下一個位置,然後獲取該矢量值並將其添加到總和中。然後它返回尋找計數來做下一次迭代。因此,對於每次迭代,計數遞增然後查找下一個可用線程(也許一個已經在等待計數;或者可能它們仍然忙於工作)來執行下一次迭代。我們不增加線程的數量,但是我希望能夠以某種方式搜索第一個完成下一個計算的所有5個線程。

我將如何去編碼這個。我所知道的每一種方式都涉及到通過線程進行循環,以至於我無法檢查下一個可能出現故障的可用程序。

+0

爲了記錄,總結一個'vector'是通過工作者線程協調任務的一個可怕情況,這些線程急切地從一組共同的值中抽取出來;要做的工作量很小,同步的成本只能確保一次計算的每個值都很高。預先對數據進行分區更有意義,因爲它消除了對同步的需求(除了在結合其結果之前等待所有線程完成),並且可以預測每個線程的數據訪問模式(適用於任何內存系統預取啓發式)。 – ShadowRanger

回答

0

在全局變量上使用semafor(或互斥體,總是混淆這兩個),告訴你下一步是什麼。只要您訪問變量使得線程訪問清除,semafor將鎖定其他線程。

所以,假設你有一個X元素的數組。和全球名爲nextfree女巫initalized爲0,那麼psudo代碼是這樣的:

while (1) 
{ 
    <lock semafor INT> 
    if (nextfree>=X) 
    { 
     <release semnafor INT> 
     <exit and terminate thread> 
    } 
    <Get the data based on "nextfree"> 
    nextfree++; 
    <release semafor INT> 

    <do your stuff withe the chunk you got> 
} 

這裏的關鍵是,每個線程將會單獨有semafor鎖內EXLUSIVE訪問數據結構和因此無論其他人在做什麼,都可以訪問下一個。 (其他線程必須等待,如果他們完成,而另一個線程正在獲取下一個數據塊。當你釋放只有隊列中的一個將獲得訪問權限,其餘的將不得不等待。)

那裏有些東西需要保護。如果您設法退出錯誤的位置(使用釋放它)或創建死鎖,Semafor可能會鎖定您的系統。

0

這是一個線程池:

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 
struct thread_pool { 
    thread_pool(std::size_t n = 1) { start_thread(n); } 
    thread_pool(thread_pool&&) = delete; 
    thread_pool& operator=(thread_pool&&) = delete; 
    ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R()> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> run_task(F task) { 
    if (threads_active() >= total_threads()) { 
     start_thread(); 
    } 
    return queue_task(std::move(task)); 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::size_t threads_active() const { 
    return active; 
    } 
    std::size_t total_threads() const { 
    return threads.size(); 
    } 
    void clear_threads() { 
    terminate(); 
    threads.clear(); 
    } 
    void start_thread(std::size_t n = 1) { 
    while(n-->0) { 
     threads.push_back(
     std::async(std::launch::async, 
      [this]{ 
      while(auto task = tasks.pop_front()) { 
       ++active; 
       try{ 
       (*task)(); 
       } catch(...) { 
       --active; 
       throw; 
       } 
       --active; 
      } 
      } 
     ) 
    ); 
    } 
    } 
private: 
    std::vector<std::future<void>> threads; 
    threaded_queue<std::packaged_task<void()>> tasks; 
    std::atomic<std::size_t> active; 
}; 

你給它多線程或者在建或通過start_thread如何。

然後您queue_task。這會返回一個std::future,告訴你任務何時完成。

隨着線程完成任務,他們去threaded_queue並尋找更多。

threaded_queue被銷燬時,它會中止其中的所有數據。

thread_pool被銷燬時,它會中止所有將來的任務,然後等待所有未完成的任務完成。

Live example