2016-04-22 135 views
2

真正的代碼方式更復雜,但我認爲我設法制作了一個mcve。C++線程安全和notify_all()

我試圖做到以下幾點:

  1. 有一些線程做的工作
  2. 把他們都進入暫停狀態
  3. 喚醒他們第一次,等待它完成,再醒來第二個,等待它完成,喚醒了第三個。等等。

我使用的代碼下面,它似乎工作

std::atomic_int which_thread_to_wake_up; 
std::atomic_int threads_asleep; 
threads_asleep.store(0); 
std::atomic_bool ALL_THREADS_READY; 
ALL_THREADS_READY.store(false); 
int threads_num = .. // Number of threads 
bool thread_has_finished = false; 

std::mutex mtx; 
std::condition_variable cv; 

std::mutex mtx2; 
std::condition_variable cv2; 


auto threadFunction = [](int my_index) { 

    // some heavy workload here.. 
    .... 

    { 
     std::unique_lock<std::mutex> lck(mtx); 
     ++threads_asleep; 
     cv.notify_all(); // Wake up any other thread that might be waiting 
    } 

    std::unique_lock<std::mutex> lck(mtx); 
    bool all_ready = ALL_THREADS_READY.load(); 
    size_t index = which_thread_to_wake_up.load(); 
    cv.wait(lck, [&]() { 
     all_ready = ALL_THREADS_READY.load(); 
     index = which_thread_to_wake_up.load(); 
     return all_ready && my_index == index; 
    }); 

    // This thread was awaken for work! 
    .. do some more work that requires synchronization.. 

    std::unique_lock<std::mutex> lck2(mtx2); 
    thread_has_finished = true; 
    cv2.notify_one(); // Signal to the main thread that I'm done 
}; 

// launch all the threads.. 
std::vector<std::thread> ALL_THREADS; 
for (int i = 0; i < threads_num; ++i) 
    ALL_THREADS.emplace_back(threadFunction, i);  


// Now the main thread needs to wait for ALL the threads to finish their first phase and go to sleep  

std::unique_lock<std::mutex> lck(mtx); 
size_t how_many_threads_are_asleep = threads_asleep.load(); 
    while (how_many_threads_are_asleep < threads_num) { 
     cv.wait(lck, [&]() { 
     how_many_threads_are_asleep = threads_asleep.load(); 
     return how_many_threads_are_asleep == numThreads; 
     }); 
    } 

// At this point I'm sure ALL THREADS ARE ASLEEP! 

// Wake them up one by one (there should only be ONE awake at any time before it finishes his computation) 

for (int i = 0; i < threads_num; i++) 
{ 
    which_thread_to_wake_up.store(i); 
    cv.notify_all(); // (*) Wake them all up to check if they're the chosen one 
    std::unique_lock<std::mutex> lck2(mtx2); 
    cv2.wait(lck, [&]() { return thread_has_finished; }); // Wait for the chosen one to finish 
    thread_has_finished = false; 
} 

恐怕最後notify_all()調用(一個我標有(*))可能會導致以下情況:

  • 所有線程都睡着了
  • 所有線程都從喚醒主線程通過調用notify_all()
  • 具有正確索引的線程完成最後一次計算並釋放鎖
  • 所有其他線程■找被喚醒,但他們有沒有選中原子變量YET
  • 主線程發出第二notify_all(),這都丟失了(因爲線程都驚醒的是,他們並沒有簡單地檢查了原子能還)

這會發生嗎?我找不到notify_all()的任何措辭,如果它的電話是緩衝或與實際檢查條件變量的函數同步的順序。

+0

我沒有仔細研究過你的代碼,但是你描述的一般情況是可能的。條件變量不存儲信號。只有那些在發送信號時等待條件變量的線程才能看到該信號。由信號喚醒的線程不再等待,即使它們還沒有從'wait()'函數返回。 –

回答

0

按上(notify_all

notify_all的文檔只有一半的要求,繼續一個線程。條件陳述也必須是真實的。所以必須有一個交通警察設計來喚醒第一個,喚醒第二個,喚醒第三個。 notify函數通知線程檢查該條件。

我的答案是比特定代碼更高的水平,但我希望這有助於。

0

您考慮的情況可能會發生。如果在調用notify_all()時喚醒了工作線程(從屬),那麼他們可能會錯過該信號。

防止這種情況的一種方法是在cv.notify_all()之前鎖定mtx,然後解鎖。如wait()文檔中建議的那樣,鎖被用作pred()訪問的保護。如果主線程獲得mtx,則其他線程不會在同一時刻檢查條件。雖然他們當時可能正在做其他工作,但在您的代碼中,他們不太可能再次輸入wait