2016-07-20 51 views
1

我正試圖用C++ 11併發支持來做到這一點。正確的方式來等待由多個線程通知的條件變量

我有一個線程池的工作線程都做同樣的事情,其中​​一個主線程有一個條件變量數組(每個線程一個,他們需要'開始'同步,即不運行一個循環)。

for (auto &worker_cond : cond_arr) { 
     worker_cond.notify_one(); 
    } 

然後,此線程必須等待池的每個線程的通知再次重新啓動其週期。什麼是這樣做的正確方法?有一個單一的條件變量,並等待一些整數每個線程是不是主要增加?像(仍然在主線程)

unique_lock<std::mutex> lock(workers_mtx); 
    workers_finished.wait(lock, [&workers] { return workers = cond_arr.size(); }); 
+1

快速問答:你有沒有考慮過['std :: experimental :: barrier'](http://en.cppreference.com/w/cpp/experimental/barrier)? –

回答

1

我看到兩個選項:

選項1:join()

基本上而不是使用條件變量在你的線程開始計算,你爲每次迭代產生一個新線程,並使用join()等待它完成。然後你爲下一次迭代產生新的線程,以此類推。

選項2:鎖定

你不想要,只要一個線程仍在工作的主線程通知。所以每個線程都有自己的鎖,在執行計算之前鎖定它,然後解鎖。你的主線程在調用notify()之前鎖定它們,並在之後解鎖它們。

+0

對於第一個,不會更容易使用。第二個不起作用,因爲它可能發生,當主線程正在等待某個具有不同鎖的其他線程的鎖時,可以循環多次,這是我不想要的! – Aram

+0

如果在每個循環之後工作線程釋放鎖,然後等待條件變量並且只有在通知重新鎖定並且執行一個週期時纔會發生。 – Anedar

1

我沒有看到你的解決方案沒有根本的錯誤。

防護workersworkers_mtx並完成。

我們可以用計數信號對此進行抽象。

struct counting_semaphore { 
    std::unique_ptr<std::mutex> m=std::make_unique<std::mutex>(); 
    std::ptrdiff_t count = 0; 
    std::unique_ptr<std::condition_variable> cv=std::make_unique<std::condition_variable>(); 

    counting_semaphore(std::ptrdiff_t c=0):count(c) {} 
    counting_semaphore(counting_semaphore&&)=default; 

    void take(std::size_t n = 1) { 
    std::unique_lock<std::mutex> lock(*m); 
    cv->wait(lock, [&]{ if (count-std::ptrdiff_t(n) < 0) return false; count-=n; return true; }); 
    } 
    void give(std::size_t n = 1) { 
    { 
     std::unique_lock<std::mutex> lock(*m); 
     count += n; 
     if (count <= 0) return; 
    } 
    cv->notify_all(); 
    } 
}; 

take需要count路程,塊如果沒有足夠的。

give增加到count,並通知是否有正數量。

現在工作者在兩個信號量之間線程化渡輪令牌。

std::vector<counting_semaphore> m_worker_start{count}; 
counting_semaphore m_worker_done{0}; // not count, zero 
std::atomic<bool> m_shutdown = false; 

// master controller: 
for (each step) { 
    for (auto&& starts:m_worker_start) 
    starts.give(); 
    m_worker_done.take(count); 
} 

// master shutdown: 
m_shutdown = true; 
// wake up forever: 
for (auto&& starts:m_worker_start) 
    starts.give(std::size_t(-1)/2); 

// worker thread: 
while (true) { 
    master->m_worker_start[my_id].take(); 
    if (master->m_shutdown) return; 
    // do work 
    master->m_worker_done.give(); 
} 

或某些情況。

live example