2015-10-04 105 views
3

我有一個無鎖定單生產者多個消費者隊列,使用std::atomics以類似於香草服務器CPPCon2014的方式實現。condition_variable在無鎖實現中無互斥鎖

有時,生產者過於緩慢,無法供給所有消費者,因此消費者可能會捱餓。我想阻止飢餓的消費者在隊列中砰砰地跳,因此我爲10ms添加了睡眠。這個值是任意的並且不是最優的。我希望使用一個信號,消費者可以在隊列中有空閒插槽後再發送給製作人。在基於鎖的實現中,我自然會使用std::condition_variable來執行此任務。但是現在在我的無鎖實現中,我不確定,如果它是引入mutex的正確設計選擇,那麼只能使用std::condition_variable

我只是想問你,如果在這種情況下mutex是正確的路?

編輯:我有一個生產者,從來沒有睡覺。還有多個消費者,如果他們餓死,他們就會入睡。因此整個系統總是在進步,因此我認爲它是無鎖的。 我目前的解決辦法是要做到這一點,在消費者中的GetData功能:
std::unique_lock<std::mutex> lk(_idleMutex); _readSetAvailableCV.wait(lk);

這在生產者線程一旦新的數據已經準備好:
_readSetAvailableCV.notify_all();

+1

帶有信號的睡眠線程不是鎖定的,非常符合定義。你知道鎖定免費主要是關於某些保證aboit線程調度和進度,而不是aboit「更快」,對嗎?你需要什麼鎖定免費保證? – Yakk

+0

我不認爲我理解。如果是飢餓的消費者,爲什麼他們在隊列中有空閒插槽時通知製片人? – Davislor

+0

但是,如果你確實需要一個帶有多個編寫器的變量,這意味着至少有一個編寫器準備好使用更新,那聽起來就像是一個原子標誌。 – Davislor

回答

0

我剛看了關於併發TS這CPPCON視頻: Artur Laksberg @cppcon2015

某處在這次談話中間阿圖爾解釋到底如何我的問題可以用障礙和鎖存器來解決。他還以我的方式顯示了使用condition_variable的現有解決方法。他強調了用於此目的的condition_variable的某些弱點,例如在進入等待之前虛假喚醒和丟失通知信號。 但是,在我的應用程序中,這些限制是沒有問題的,所以我現在認爲,我將使用我在我的文章的編輯中提到的解決方案 - 直到鎖存器/攔截器可用。 感謝大家的評論。

0

一個與並行線程可能是一個飢餓的線程休眠與pause()並醒來與SIGCONT。每個線程都有自己的awake標誌。如果任何線程在生產者發佈新輸入時處於睡眠狀態,請使用pthread_kill()將其喚醒。

+0

我想要有可移植的C++ 11代碼 – bodzcount

+0

C++ 11的等價物是'condition_variable :: notify_one()'。 – Davislor

1

如果你的大多數線程都只是等待爲生產者排隊的資源,我沒那麼肯定無鎖的實現甚至值得的努力。大多數時候,你的線程會睡覺,他們不會爲隊列鎖而互相爭鬥。

這就是爲什麼我認爲(從你提供的數據量),改變一切與互斥鎖+ conditional_variable一起工作就好了。當生產者排隊資源時,它僅通知一個線程(使用notifiy_one())並釋放隊列鎖定。鎖定隊列的使用者,如果隊列再次爲空,則將隊列出隊並返回到睡眠狀態。線程之間不應該有任何真正的「摩擦」(如果你的製作者很慢),所以我會去做。

+0

我同意你的意見。在我使用互斥體實現之前,線程之間沒有太大的摩擦。不過,我想將它轉換爲無鎖設計,只是爲了練習:) 我測量了速度,與之前大致相同。 PS:取決於硬盤的系統和速度,有時消費者速度更快,有時甚至是生產商 – bodzcount

+0

使代碼變得更糟似乎不是有用的做法。 –

0

只需使用最少的設計更改,您就可以簡單地使用信號量。每當產品推入隊列時,信號量就會開始變空,並被提升。消費者在從隊列中彈出之前首先嚐試降低信號量。

雖然可以用互斥量,條件變量和計數器來模擬C++ 11,但它並未提供信號量實現。

如果您確實想要在生產者比消費者更快的情況下執行無鎖定行爲,則可以使用雙重檢查鎖定。

/* producer */ 
bool was_empty = q.empty_lock_free(); 
q.push_lock_free(x); 
if (was_empty) { 
    scoped_lock l(q.lock()); 
    if (!q.empty()) { 
     q.cond().signal(); 
    } 
} 

/* consumers */ 
for (;;) { 
    if (q.empty_lock_free()) { 
     scoped_lock l(q.lock()); 
     while (q.empty()) { 
      q.cond().wait(); 
     } 
     x = q.pop(); 
     if (!q.empty()) { 
      q.cond().signal(); 
     } 
    } else { 
     try { 
      x = q.pop_lock_free(); 
     } catch (empty_exception) { 
      continue; 
     } 
     break; 
    } 
}