真正的代碼方式更復雜,但我認爲我設法制作了一個mcve。C++線程安全和notify_all()
我試圖做到以下幾點:
- 有一些線程做的工作
- 把他們都進入暫停狀態
- 喚醒他們第一次,等待它完成,再醒來第二個,等待它完成,喚醒了第三個。等等。
我使用的代碼下面,它似乎工作
std::atomic_int which_thread_to_wake_up;
std::atomic_int threads_asleep;
threads_asleep.store(0);
std::atomic_bool ALL_THREADS_READY;
ALL_THREADS_READY.store(false);
int threads_num = .. // Number of threads
bool thread_has_finished = false;
std::mutex mtx;
std::condition_variable cv;
std::mutex mtx2;
std::condition_variable cv2;
auto threadFunction = [](int my_index) {
// some heavy workload here..
....
{
std::unique_lock<std::mutex> lck(mtx);
++threads_asleep;
cv.notify_all(); // Wake up any other thread that might be waiting
}
std::unique_lock<std::mutex> lck(mtx);
bool all_ready = ALL_THREADS_READY.load();
size_t index = which_thread_to_wake_up.load();
cv.wait(lck, [&]() {
all_ready = ALL_THREADS_READY.load();
index = which_thread_to_wake_up.load();
return all_ready && my_index == index;
});
// This thread was awaken for work!
.. do some more work that requires synchronization..
std::unique_lock<std::mutex> lck2(mtx2);
thread_has_finished = true;
cv2.notify_one(); // Signal to the main thread that I'm done
};
// launch all the threads..
std::vector<std::thread> ALL_THREADS;
for (int i = 0; i < threads_num; ++i)
ALL_THREADS.emplace_back(threadFunction, i);
// Now the main thread needs to wait for ALL the threads to finish their first phase and go to sleep
std::unique_lock<std::mutex> lck(mtx);
size_t how_many_threads_are_asleep = threads_asleep.load();
while (how_many_threads_are_asleep < threads_num) {
cv.wait(lck, [&]() {
how_many_threads_are_asleep = threads_asleep.load();
return how_many_threads_are_asleep == numThreads;
});
}
// At this point I'm sure ALL THREADS ARE ASLEEP!
// Wake them up one by one (there should only be ONE awake at any time before it finishes his computation)
for (int i = 0; i < threads_num; i++)
{
which_thread_to_wake_up.store(i);
cv.notify_all(); // (*) Wake them all up to check if they're the chosen one
std::unique_lock<std::mutex> lck2(mtx2);
cv2.wait(lck, [&]() { return thread_has_finished; }); // Wait for the chosen one to finish
thread_has_finished = false;
}
恐怕最後notify_all()
調用(一個我標有(*))可能會導致以下情況:
- 所有線程都睡着了
- 所有線程都從喚醒主線程通過調用
notify_all()
- 具有正確索引的線程完成最後一次計算並釋放鎖
- 所有其他線程■找被喚醒,但他們有沒有選中原子變量YET
- 主線程發出第二
notify_all()
,這都丟失了(因爲線程都驚醒的是,他們並沒有簡單地檢查了原子能還)
這會發生嗎?我找不到notify_all()
的任何措辭,如果它的電話是緩衝或與實際檢查條件變量的函數同步的順序。
我沒有仔細研究過你的代碼,但是你描述的一般情況是可能的。條件變量不存儲信號。只有那些在發送信號時等待條件變量的線程才能看到該信號。由信號喚醒的線程不再等待,即使它們還沒有從'wait()'函數返回。 –