2009-12-02 46 views
0

我對於無鎖數據結構很陌生,所以對於我寫的(我希望函數爲)有界無鎖deque(不調整大小,只是想讓基函數工作)的練習。我只是想從知道自己在做什麼的人那裏得到一些確認,說明我是否有正確的想法和/或如何改進這一點。Win32中的無鎖Deque C++

class LocklessDeque 
{ 
    public: 

    LocklessDeque() : m_empty(false), 
         m_bottom(0), 
         m_top(0) {} 


    ~LocklessDeque() 
    { 
     // Delete remaining tasks 
     for(unsigned i = m_top; i < m_bottom; ++i) 
     delete m_tasks[i]; 
    } 


    void PushBottom(ITask* task) 
    { 
     m_tasks[m_bottom] = task; 

     InterlockedIncrement(&m_bottom); 
    } 


    ITask* PopBottom() 
    { 
     if(m_bottom - m_top > 0) 
     { 
     m_empty = false; 

     InterlockedDecrement(&m_bottom); 

     return m_tasks[m_bottom]; 
     } 

     m_empty = true; 

     return NULL; 
    } 


    ITask* PopTop() 
    { 
     if(m_bottom - m_top > 0) 
     { 
     m_empty = false; 

     InterlockedIncrement(&m_top); 

     return m_tasks[m_top]; 
     } 

     m_empty = true; 

     return NULL; 
    } 


    bool IsEmpty() const 
    { 
     return m_empty; 
    } 

    private: 

    ITask* m_tasks[16]; 

    bool m_empty; 

    volatile unsigned m_bottom; 
    volatile unsigned m_top; 

}; 
+0

我並不想過於嚴格,但如果您對鎖定免費數據結構非常認真,那麼您應該查找所有可以找到的鏈接。 Deque並不是一個容易開始的地方,誠實地說,從這個外觀來看,您可能想要回到更簡單的數據結構並開始工作。從這裏開始:http://www.boyet.com/articles/lockfreestack.html有一整套系列。 – 2009-12-02 20:46:40

回答

3

看着這個我認爲這將是一個問題:

void PushBottom(ITask* task) 
{ 
    m_tasks[m_bottom] = task; 
    InterlockedIncrement(&m_bottom); 
} 

如果這是在實際的多線程環境下使用,我覺得設置m_tasks[m_bottom]時,你會碰撞。想想如果你有兩個線程同時嘗試這樣做會發生什麼 - 你不能確定哪一個實際設置了m_tasks [m_bottom]。

檢出this article這是一個關於無鎖隊列的合理討論。

+0

感謝您的回覆。你會推薦我做這樣的事嗎? ... InterlockedExchangePointer(m_tasks + m_bottom,task); InterlockedIncrement(&m_bottom); – Reggie 2009-12-02 19:37:24

+0

@Reggie - no。問題是你仍然無法檢測到碰撞並處理它。無鎖容器的目的不是沒有/便宜的碰撞 - 這個想法是,你設置它,所以你很少碰撞和非碰撞便宜 - 但是當你碰撞時,你可以處理它。這聽起來像你沒有設置適當的測試設備。我要做的第一件事是建立一個測試裝置,用一堆線程在容器上加磅。 – Aaron 2009-12-02 20:02:37

2

您使用m_bottomm_top成員的索引數組是沒關係。您可以使用InterlockedXxxx()的返回值來獲取安全索引。您將需要丟失IsEmpty(),但在多線程場景中永遠不會準確。 PopXxx中的空檢查同樣存在問題。我看不出如何在沒有互斥的情況下完成這項工作。

0

解決這一問題所指出的阿龍,我會做一些事情,如:

void PushBottom(ITask *task) { 
    int pos = InterlockedIncrement(&m_bottom); 
    m_tasks[pos] = task; 
} 

同樣,彈出:

ITask* PopTop() { 
    int pos = InterlockedIncrement(&m_top); 
    if (pos == m_bottom) // This is still subject to a race condition. 
     return NULL; 
    return m_tasks[pos]; 
} 

我會消除從設計既m_emptyIsEmpty()完全。 IsEmpty返回的結果受到競爭條件的影響,這意味着當您查看該結果時,它可能已過時(即,當您查看它返回的內容時,它告訴您關於隊列的信息可能是錯誤的)。同樣,m_empty只提供了沒有它的信息記錄,這是生成陳舊數據的一個配方。使用m_empty並不能保證它不能正常工作,但它確實增加了錯誤的機率,IMO。

我猜這是由於代碼的臨時性質,但現在你也有一些嚴重的問題與數組邊界。你沒有做任何事情來強制你的數組索引環繞,所以只要你試圖將第17個任務推到隊列中,就會遇到一個主要問題。

編輯:我應該指出,評論中提到的競爭條件非常嚴重 - 並且它也不是唯一的。雖然比原來有所好轉,但應該將而不是誤認爲可以正常工作的代碼。

我想說寫入正確的無鎖代碼要比編寫使用鎖定的正確代碼困難得多。我不知道誰沒有對確實使用鎖定的代碼有深入的瞭解。基於原始代碼,我認爲最好先編寫和理解使用鎖的隊列的代碼,並且只有當你使用它來更好地理解真正涉及的問題時嘗試使用無鎖代碼。

+0

這有很大的缺陷 - 有人試圖在同一時間彈出窗口,最終可能會在實際填入m_tasks [pos]之前彈出陳舊值。 – Aaron 2009-12-02 20:14:43

+0

聯鎖增量沒有幫助嗎?是否可以輕鬆避免這個問題? – Reggie 2009-12-02 20:20:36

+0

@Reggie - 輕鬆?不,這就是爲什麼無鎖數據結構如此之難。 – Aaron 2009-12-02 20:33:02

1

做這樣幾乎不可能的事情的關鍵是使用InterlockedCompareExchange。(這是Win32使用的名稱,但任何支持多線程的平臺都會有InterlockedCompareExchange等效項)。

的想法是,你做出結構的副本(必須足夠小,以執行單位讀取(64或如果你能處理一些unportability,在x86上128位)。

你讓另一個副本與你的建議的更新,做你的邏輯和更新的副本,那麼您在使用InterlockedCompareExchange更新「真實」的結構。什麼InterlockedCompareExchange所做的是,原子確保該值仍是您開始使用狀態更新前的值,如果它是還是那個值,原子更新與新狀態的值通常這被包裝在一個無限循環不停地嘗試,直到別人沒有改變在中間值以下是大致的模式:。

union State 
{ 
    struct 
    { 
     short a; 
     short b; 
    }; 
    uint32_t volatile rawState; 
} state; 

void DoSomething() 
{ 
    // Keep looping until nobody else changed it behind our back 
    for (;;) 
    { 
     state origState; 
     state newState; 

     // It's important that you only read the state once per try 
     origState.rawState = state.rawState; 
     // This must copy origState, NOT read the state again 
     newState.rawState = origState.rawState; 

     // Now you can do something impossible to do atomically... 
     // This example takes a lot of cycles, there is huge 
     // opportunity for another thread to come in and change 
     // it during this update 
     if (newState.b == 3 || newState.b % 6 != 0) 
     { 
      newState.a++; 
     } 

     // Now we atomically update the state, 
     // this ONLY changes state.rawState if it's still == origState.rawState 
     // In either case, InterlockedCompareExchange returns what value it has now 
     if (InterlockedCompareExchange(&state.rawState, newState.rawState, 
       origState.rawState) == origState.rawState) 
      return; 
    } 
} 

(請原諒,如果上面的代碼不實際編譯 - 我寫了我的頭頂部)

大。現在,您可以輕鬆實現無鎖算法。錯誤!麻煩的是,您可以自動更新的數據量受到嚴格限制。

一些無鎖算法使用的技術,他們「幫助」併發線程。例如,假設你有一個鏈接列表,你希望能夠從多個線程中更新,其他線程可以通過執行「第一個」和「最後一個」指針的更新來「幫助」,如果他們正在通過並看到它們是在由「last」指向的節點處,但節點中的「next」指針不爲null。在這個例子中,當注意到「最後一個」指針是錯誤的時,它們會更新最後一個指針,只要它仍然指向當前節點,使用互鎖比較交換。

不要落入陷阱,你「旋轉」或環(如自旋鎖)。雖然短暫旋轉是有價值的,因爲你期望「其他」線程完成某些事情 - 它們可能不會。 「其他」線程可能已被上下文切換,並且可能不再運行。你只是在吃CPU時間,燃燒電力(可能殺死一臺筆記本電腦電池),直到情況成真。當你開始旋轉的時候,你不妨鎖定你的無鎖代碼並用鎖來寫。鎖比無限旋轉要好。想要從艱難到荒謬的過程中,考慮一下你可以使用其他體系結構的混亂情況 - 在x86/x64上,一般都很寬容,但是當你進入其他「弱排序」體系結構時,你會進入領域事情發生的地方沒有任何意義 - 內存更新不會按照程序順序進行,因此,所有關於其他線程正在做什麼的心理推理都會消失。 (即使是在x86/x64有一個名爲它經常被用來更新顯存,但可用於任何內存緩衝區的硬件,你需要圍欄時「寫入合併」的存儲器類型)的架構要求您使用「記憶柵欄」操作,以保證所有讀/寫/兩者都將在全局可見的情況下(由其他內核執行)。寫入柵欄保證柵欄之前的任何寫入在柵欄之後的任何寫入之前將全局可見。讀圍欄將保證在圍欄之前沒有讀數會在圍欄之前被推測性地執行。讀/寫柵欄(又名全柵欄或內存柵欄)將作出保證。柵欄非常昂貴。 (有些使用術語「屏障」,而不是「籬笆」)

我的建議是用鎖/條件變量第一個實現它。如果你在完成這項工作時遇到任何問題,那麼嘗試執行無鎖實現是沒有希望的。並始終衡量,衡量,衡量。您可能會發現使用鎖的實現的性能非常好 - 沒有一些片狀無鎖實現的不確定性,只有當您向重要客戶進行演示時纔會顯示natsy掛起錯誤。也許你可以通過將原始問題重新定義爲更容易解決的問題來解決這個問題,也許可以通過重組工作來讓更大的物品(或批次物品)進入集合,從而減輕整個事物的壓力。編寫無鎖定的併發算法非常困難(正如你在其他地方寫過的,我確信)。這通常也是不值得的。