2010-10-06 79 views
9

msdn文檔指出靜態通用隊列是線程安全的。這是否意味着下面的代碼是線程安全的?換句話說,當一個線程入隊一個int和另一個線程同時出隊一個int時有問題嗎?爲了線程安全,我必須鎖定Enqueue和Dequeue操作嗎?這是使用靜態隊列線程安全的嗎?

class Test { 
    public static Queue<int> queue = new Queue<int>(10000); 

    Thread putIntThread; 
    Thread takeIntThread; 

    public Test() { 
     for(int i = 0; i < 5000; ++i) { 
      queue.Enqueue(0); 
     } 
     putIntThread = new Thread(this.PutInt); 
     takeIntThread = new Thread(this.TakeInt); 
     putIntThread.Start(); 
     takeIntThread.Start(); 
    } 

    void PutInt() { 
     while(true) 
     { 
      if(queue.Count < 10000) {//no need to lock here as only itself can change this condition 
       queue.Enqueue(0); 
      } 
     } 
    } 

    void TakeInt() { 
     while(true) { 
      if(queue.Count > 0) {//no need to lock here as only itself can change this condition 
       queue.Dequeue(); 
      } 
     } 
    } 

} 

編輯:我必須使用.NET 3.5

+0

我注意到你不想鎖定的評論,因爲它已經很慢並且鎖定會讓它變慢。如果這個代碼已經很慢,那麼相比之下,鎖定的開銷會很小。鎖定速度通常很快,但人們認爲它非常慢。我曾經看到人們在沒有進行任何測量的情況下創建了精美的讀寫器鎖。當被迫這樣做時,花哨的人往往比較慢。做簡單的.net鎖定,然後通過配置文件找出你很慢的地方 – pm100 2010-10-06 23:01:35

回答

23

這絕對不是線程安全的。從Queue<T>的文檔。

此類型的公共靜態(Visual Basic中的Shared)成員是線程安全的。任何實例成員不保證是線程安全的。

A Queue<T>可以同時支持多個閱讀器,只要該集合未被修改即可。即便如此,枚舉集合本質上不是一個線程安全的過程。爲了確保枚舉期間的線程安全性,您可以在整個枚舉過程中鎖定集合。爲了讓集合可以被多個線程讀取和寫入,您必須實現自己的同步。

重讀你的問題,你似乎對「這種類型的靜態成員」這個短語感到困惑 - 它不是在談論「靜態隊列」,因爲沒有這種東西。一個對象不是靜態的 - 一個成員。當它談到靜態成員時,它正在討論諸如Encoding.GetEncodingQueue<T>實際上沒有任何靜態成員)。實例成員是類似於EnqueueDequeue的東西 - 與類型實例相關的成員,而不是類型本身。

因此,無論您需要爲每個操作使用鎖還是使用.NET 4,都可以使用ConcurrentQueue<T>

+1

每當我讀到你的答案時,我都會學到新的東西!感謝您來到SO。 :) – Nayan 2010-10-06 08:38:32

+0

謝謝。的確,我誤解了「這種類型的靜態成員」。 – blizpasta 2010-10-06 09:04:33

1

是的,你必須鎖定就像MSDN

要允許由多個線程用於讀取和寫入訪問的集合,則必須實現自己的同步。

2

MSDN聲明的是,Queue的靜態方法是線程安全的,而不是靜態實例的實例方法是線程安全的。

+1

對不起,這張貼在Jon的答案的第1版和第2版之間。 – Timores 2010-10-06 08:53:22

+0

沒有必要道歉,這不是問題。 – blizpasta 2010-10-06 09:04:09

+0

不是一個真正的問題,但是當我開始打字時,我的答案有一定的價值。當我完成時,Jon已經更新了他的帖子,而我自己的帖子已經不再有任何價值了.-) – Timores 2010-10-06 10:16:39

4

是的,正如這裏所說的,一個靜態實例的實例成員與靜態成員不同,它只是保證線程安全的後者,所以你必須鎖定入隊和出隊操作。

如果鎖定被證明是一個瓶頸,隊列是簡單的收藏品之一在無鎖的方式來寫,只要一不還需要通過Queue<T>提供全面落實ICollection<T>

internal sealed class LockFreeQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private volatile Node _head; 
    private volatile Node _tail; 
    public LockFreeQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    for(;;) 
    { 
     Node curTail = _tail; 
     if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail. 
     { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 
     return; 
     } 
     else 
     { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
     } 
    } 
    }  
    public bool TryDequeue(out T item) 
    { 
    for(;;) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (curHead == curTail) 
     { 
     if (curHeadNext == null) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
      Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
     { 
      return true; 
     } 
     } 
    } 
    } 
#pragma warning restore 420 
} 

該隊列只有一個EnqueueTryDequeue(如果隊列爲空,則返回false)方法。使用聯鎖增量和減量來添加計數屬性並不重要(確保在實際屬性中計數字段是不穩定地讀取的),但是除此之外,添加任何不能寫爲委託給一個成員已經定義,或在施工過程中發生(在這種情況下,您將只有一個線程在這一點上使用它,除非你做了一些非常奇怪的事情)。

該實現也是免等待的,就好像一個線程的動作不會阻止另一個線程進展(如果一個線程在第二個線程嘗試這樣做時通過入隊過程的一半,則第二個線程將完成第一個線程的工作)。儘管如此,我還是等到鎖定實際上證明了一個瓶頸(除非你只是在試驗;玩弄異國情調,與熟悉的工作)。事實上,在許多情況下,這會比鎖定Queue<T>更昂貴,尤其是因爲它不太適合將內容彼此靠近保存在內存中,所以您可能會發現由於這個原因,大量的操作接連不斷。只要不存在頻繁的鎖爭用,鎖定通常相當便宜。

編輯:

我現在有時間在上述工作方式上添加註釋。我通過閱讀別人的同一個想法的版本來寫這篇文章,寫這個給我自己來複制這個想法,然後和我之後閱讀的版本進行比較,並發現它是一個非常豐富的練習。

讓我們從非鎖定的免費實現開始。這是一個單獨鏈接的列表。

internal sealed class NotLockFreeYetQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next{get;set;} 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private Node _head; 
    private Node _tail; 
    public NotLockFreeYetQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    _tail.Next = newNode; 
    _tail = newNode; 
    } 
    public bool TryDequeue(out T item) 
    { 
     if (_head == _tail) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
     { 
     item = _head.Next.Item; 
     _head = _head.Next; 
     return true; 
     } 
    } 
} 

到目前爲止的實施情況的一些注意事項。

ItemNext可合理地爲字段或屬性。因爲它是一個簡單的內部類,其中一個必須是readonly而另一個是「啞」讀寫(在getter或setter中沒有邏輯),在這裏實際上沒有太多選擇。我在這裏設置了Next這是一個屬性,純粹是因爲這個功能稍後不會起作用,當我們到達那裏時我想談談。

_head_tail開始爲指向一個哨兵,而不是通過null不必對空隊列的特殊情況簡化了操作。

因此,排隊將創建一個新節點,並在成爲新尾部之前將其設置爲_tailNext屬性。出列將檢查是否爲空,如果不爲空,則從頭節點獲取值並將頭設置爲舊頭的屬性爲Next的節點。

此時需要注意的另一件事是,由於新節點是根據需要創建的,而不是預先分配的數組,因此在正常使用中,它的性能會比Queue<T>低。這不會變得更好,而且我們現在要做的所有事情都會使單線程性能變差。再一次,只有在激烈的爭論中,這將打敗鎖定的Queue<T>

讓我們使鎖入鎖。我們將使用Interlocked.CompareExchange()。這將第一個參數與第三個參數進行比較,如果它們相等,則將第一個參數設置爲第二個參數。無論如何它會返回舊值(不管它是否被覆蓋)。比較和交換是作爲一個原子操作完成的,所以本身就是線程安全的,但是我們需要更多的工作來使這些操作的組合也是線程安全的。

CompareExchange和其它語言的等價物有時簡稱CAS(用於比較並交換)。

使用它們的一種常見方式是在循環中,我們首先通過正常讀取獲得我們將重寫的值(請記住,.NET讀取的32位值,較小的值和引用類型總是原子的),以及嘗試覆蓋它,如果它沒有改變,循環,直到我們成功了:

private sealed class Node 
{ 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
    Item = item; 
    } 
} 
/* ... */ 
private volatile Node _tail; 
/* ... */ 
public void Enqueue(T item) 
{ 
    Node newNode = new Node(item); 
    for(;;) 
    { 
    Node curTail = _tail; 
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) 
    { 
     _tail = newNode; 
     return; 
    } 
    } 
} 

我們希望添加到尾部的Next只有當它的null - 如果不是因爲寫它的另一個線程。所以,我們做一個只有在這種情況下才能成功的CAS。如果是,我們設置_tail爲新節點,否則我們再試一次。

下,就必須改變是一個字段這個工作,我們不能與性能做到這一點。我們還製作_tailvolatile,以便_tail在所有CPU高速緩存中都是新鮮的(CompareExchange具有易失性語義,因此不會因缺乏波動性而被破壞,但它可能會比所需的頻率更頻繁地運行, _tail也)。

這是無鎖的,但沒有等待。如果一個線程和CAS一樣遠,但還沒有寫入_tail,並且一段時間沒有任何CPU時間,那麼所有其他嘗試排隊的線程都將保持循環,直到它被調度並設法執行爲止。如果線程長時間被中止或暫停,這會導致一種永久活鎖。

所以,如果我們在中科院已經失敗的情況下,我們是在這樣的情況。我們可以通過執行其他線程的工作是解決這一問題:

for(;;) 
    { 
    Node curTail = _tail; 
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) 
    { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 

     return; 
    } 
    else 
    { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
    } 
    } 

現在,在大多數情況下寫信給curTail.Next線程將新節點分配給_tail - 而是通過CAS的情況下,它已經完成。但是,另一個線程無法寫入curtail.Next,它可以嘗試將curTail.Next指定爲_tail,以完成第一個線程的工作並開始自己的工作。

因此,一個無鎖,免等待入隊。有時間去離職。首先讓我們考慮一下我們不懷疑排隊空的情況。就像排隊一樣,我們將首先獲取我們感興趣的節點的本地副本; _head_tail_head.Next(再次不使用空頭部或尾部空隊列使生活更輕鬆,這意味着它是安全的閱讀在任何狀態下_head.Next)。也像入隊,我們將取決於波動,這時候不只是_tail,但_head,所以我們將其更改爲:

private volatile Node _head; 

我們改變TryDequeue到:

public bool TryDequeue(out T item) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (_head == _tail) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
      return true; 
     } 
    } 

空 - 現場案件現在是不正確的,但我們會回到那個。它是安全的項目設置爲curHeadNext.Item因爲如果我們沒有完成,我們將再次覆蓋它的操作,但我們必須使操作寫入_head原子,並保證只要_head沒有改變的情況發生。如果沒有,那麼_head已被另一個線程更新,我們可以再次循環(無需爲該線程工作,它已經完成了所有會影響我們的任務)。

現在考慮如果_head == _tail會發生什麼情況。可能它是空的,但可能_tail.Next(這將與curHeadNext相同)是由入隊寫入的。在這種情況下,我們更可能想要的不是空虛的結果,而是我們將部分排隊的項目出隊的結果。所以,我們協助該線程,並再次繼續循環:

if (curHead == curTail) 
{ 
    if (curHeadNext == null) 
    { 
     item = default(T); 
     return false; 
    } 
    else 
     Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); 
} 

最後,唯一剩下的問題是,我們不斷收到420個警告,因爲我們傳遞揮發性領域byref方法。這往往會阻止易失性語義(因此警告),但不會與CompareExchange(因此我們這樣做)。我們可以禁用警告,包括解釋我們爲什麼這麼做的評論(我嘗試不要在沒有正當評論的情況下禁用警告),並且我們已經提供了前面給出的代碼。

請注意,我們在GC支持框架中這樣做很重要。如果我們不得不處理重新分配,它將變得更加複雜。

+0

+1:謝謝!你似乎能夠閱讀我的想法,從我的問題推斷出,如果可能的話,我想避免鎖定。我目前的代碼沒有鎖,我想確認我對msdn文檔的(錯誤)解釋。它已經太慢了,添加鎖會讓它更慢。如果我無法提高速度,我可能會最終嘗試您的建議,儘管我必須承認我還不瞭解您的代碼(這是我的工作內容)。 – blizpasta 2010-10-06 15:52:20

+0

我很樂意解釋上面的代碼,雖然也許它與原始主題相距甚遠,並且在其他地方最好。我會非常**懷疑這會有所幫助。鎖並不昂貴;有爭議的鎖可能是昂貴的(有時是非常殘酷的),但沒有爭議的鎖不會成爲瓶頸(當然,當它們被爭議時恰恰是你應該感到高興的時候)。同時,由於'Queue '的工作方式,從CPU頻繁訪問它可能比我在這裏得到的代碼更快,因爲... – 2010-10-06 15:58:11

+0

...項目在內存中彼此更接近,因此更可能在CPU緩存中。另外一個簡單的讀寫操作(在隊列中使用)比我課上使用的互鎖操作要快。在無爭議的情況下,我希望'隊列'擊倒我的課,即使增加了鎖。所以,雖然上述*可能會有用,但我只會認爲,如果高鎖定爭用被證明是一個真正的問題(或爲了嘗試新的做事方式的樂趣,這就是爲什麼我有上述的手[基於在閱讀其他代碼時,不是100%原創]) – 2010-10-06 16:03:33