2011-05-20 78 views
1

我正在尋找關於如何寫入MP/MC隊列以便無鎖或甚至無需等待的文檔。我使用.Net 4.0。發現了很多C++代碼,但我對內存模型不是很熟悉,所以在移植到C#時有很大的機會引入一些錯誤。多生產者多消費者無鎖(甚至等待)隊列

+0

此線程可能是一個開始:http://groups.google.com/group/comp.programming.threads/ browse_thread/thread/87b343bf5cd1dc46 – dlev 2011-05-20 22:57:04

+0

關於這個話題有一本很好的Java書:Java Concurreny in Practice。所有的代碼示例都可以在他們的[網站] [1]上找到,但是,如果你不熟悉Java併發框架,那麼如果沒有這本書,代碼可能很難理解。 [1]:http://www.javaconcurrencyinpractice.com – Stefan 2011-05-20 23:00:25

回答

3

爲什麼你認爲你需要無鎖隊列?您是否嘗試過使用ConcurrentQueue<T>,可能被封閉在BlockingCollection<T>之內?

編寫多線程代碼很困難。編寫無鎖代碼更難,除非你真的必須自己動手,否則不應該自己動手編寫代碼。

+0

我正在編寫通用子系統,所以我只想盡可能地做到這一點。 – adontz 2011-05-20 23:08:39

+0

@adontz,在這種情況下,你應該讓你的系統可擴展:你提供了一些併發隊列的實現,這在大多數情況下足夠好,但是讓用戶在需要時自行編寫它們。 – svick 2011-05-20 23:18:47

+0

不幸的是(對我來說),主要用例是一個高負載,所以無鎖不會超過「足夠好」。我預計不會實現一個默認的等待空閒隊列,但基於鎖定的解決方案是不可接受的。我不是自己的敵人,至少需要實現一個無鎖隊列。 – adontz 2011-05-20 23:40:43

1

我第一次去ConcurrentQueue<T>,但是您可以將數據存儲抽象到一個接口後面,以便您輕鬆更改實現。然後基準典型場景並查看遇到問題的位置。請記住:過早優化是萬惡之源。設計您的系統,使其不與實現綁定,而是與合同綁定,然後您可以優化您的實現。

我看了一下ConcurrentQueue<T>與ILSpy似乎是一個無鎖的實施乍一看 - 這麼好的機會,這正是你正在尋找。

+0

ConcurrentQueue 只是一塊MPMC隊列。它無助於等待新貨到貨,通知新貨到達。 我認爲一個AutoResetEvent就足夠了,它模擬了不同的情況:生產者在某個時刻比消費者工作得更快,消費者在某個時刻工作得比生產者快。虛擬喚醒也是一個問題,線程不應該爲了查看空隊列而醒來。所以有很多地方犯錯誤。 我通常知道多線程,但絕對不是這方面的專業人士。 – adontz 2011-05-21 07:58:57

+0

@adontz,這是'BlockingCollection '將幫助你。它負責爲消費者等待新的物品,並等待隊列中有空閒空間,如果你選擇限制其大小,對於生產者。 – svick 2011-05-21 10:46:09

0

作爲一個選項考慮,有一個算法the bounded Multiple Producer Multiple Consumer queue by Dmitry Vyukov。我已經將算法移植到.NET,你可以找到the sources on github。速度非常快。

的排隊算法:

public bool TryEnqueue(object item) 
{ 
    do 
    { 
     var buffer = _buffer; // prefetch the buffer pointer 
     var pos = _enqueuePos; // fetch the current position where to enqueue the item 
     var index = pos & _bufferMask; // precalculate the index in the buffer for that position 
     var cell = buffer[index]; // fetch the cell by the index 
     // If its sequence wasn't touched by other producers 
     // and we can increment the enqueue position 
     if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos) 
     { 
      // write the item we want to enqueue 
      Volatile.Write(ref buffer[index].Element, item); 
      // bump the sequence 
      buffer[index].Sequence = pos + 1; 
      return true; 
     } 

     // If the queue is full we cannot enqueue and just return false 
     if (cell.Sequence < pos) 
     { 
      return false; 
     } 

     // repeat the process if other producer managed to enqueue before us 
    } while (true); 
} 

出隊算法:

public bool TryDequeue(out object result) 
{ 
    do 
    { 
     var buffer = _buffer; // prefetch the buffer pointer 
     var bufferMask = _bufferMask; // prefetch the buffer mask 
     var pos = _dequeuePos; // fetch the current position from where we can dequeue an item 
     var index = pos & bufferMask; // precalculate the index in the buffer for that position 
     var cell = buffer[index]; // fetch the cell by the index 
     // If its sequence was changed by a producer and wasn't changed by other consumers 
     // and we can increment the dequeue position 
     if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos) 
     { 
      // read the item 
      result = Volatile.Read(ref cell.Element); 
      // update for the next round of the buffer 
      buffer[index] = new Cell(pos + bufferMask + 1, null); 
      return true; 
     } 

     // If the queue is empty return false 
     if (cell.Sequence < pos + 1) 
     { 
      result = default(object); 
      return false; 
     } 

     // repeat the process if other consumer managed to dequeue before us 
    } while (true); 
} 
相關問題