我正在尋找關於如何寫入MP/MC隊列以便無鎖或甚至無需等待的文檔。我使用.Net 4.0。發現了很多C++代碼,但我對內存模型不是很熟悉,所以在移植到C#時有很大的機會引入一些錯誤。多生產者多消費者無鎖(甚至等待)隊列
回答
爲什麼你認爲你需要無鎖隊列?您是否嘗試過使用ConcurrentQueue<T>
,可能被封閉在BlockingCollection<T>
之內?
編寫多線程代碼很困難。編寫無鎖代碼更難,除非你真的必須自己動手,否則不應該自己動手編寫代碼。
我第一次去ConcurrentQueue<T>
,但是您可以將數據存儲抽象到一個接口後面,以便您輕鬆更改實現。然後基準典型場景並查看遇到問題的位置。請記住:過早優化是萬惡之源。設計您的系統,使其不與實現綁定,而是與合同綁定,然後您可以優化您的實現。
我看了一下ConcurrentQueue<T>
與ILSpy似乎是一個無鎖的實施乍一看 - 這麼好的機會,這正是你正在尋找。
ConcurrentQueue
@adontz,這是'BlockingCollection
作爲一個選項考慮,有一個算法the bounded Multiple Producer Multiple Consumer queue by Dmitry Vyukov。我已經將算法移植到.NET,你可以找到the sources on github。速度非常快。
的排隊算法:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
出隊算法:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
- 1. 生產者 - 消費者多個生產者多個隊列單個消費者
- 2. 鎖定免費隊列 - 單個生產者,多個消費者
- 3. C++中的無鎖多生產者多個消費者
- 4. C++中是否存在多生產者單消費者無鎖隊列?
- 5. Fober et al無鎖定FIFO隊列:多個消費者和生產者?
- 6. C#等待生產者/消費者中的多個事件
- 7. 幾乎無鎖生產者消費者
- 8. 生產者/消費者工作隊列
- 9. 生產者消費者阻止隊列
- 10. 消費者過濾的生產者 - 消費者阻塞隊列
- 11. 單個生產者多個消費者 - 隊列包含null
- 12. 隊列爲多個生產者和消費者
- 13. 同步生產者,消費者和生產者隊列
- 14. 生產者中的死鎖消費者C#有界隊列
- 15. 並行雙鏈表 - 多生產者/消費者FIFO隊列 - 死鎖
- 16. 消費者生產者多線程消費者不會消逝
- 17. ActiveMQ一個生產者多消費者
- 18. 多線程生產者/消費者
- 19. 多線程生產者/消費者
- 20. 多生產者/消費者績效
- 21. 多個生產者,單個消費者
- 22. 雙排隊的消費者生產者
- 23. 生產者/消費者死鎖多線程Java
- 24. JAVA - 併發 - 多生產者/多消費者請求和響應隊列
- 25. Java生產者 - 消費者:生產者不「通知()」消費者
- 26. 生產者/消費者,BlockingCollection,並正在等待更改
- 27. 消費者/生產者在兩端等待
- 28. 生產者消費者併發性沒有等待POSIX
- 29. 生產者/消費者使用等待和通知
- 30. 生產者/消費者
此線程可能是一個開始:http://groups.google.com/group/comp.programming.threads/ browse_thread/thread/87b343bf5cd1dc46 – dlev 2011-05-20 22:57:04
關於這個話題有一本很好的Java書:Java Concurreny in Practice。所有的代碼示例都可以在他們的[網站] [1]上找到,但是,如果你不熟悉Java併發框架,那麼如果沒有這本書,代碼可能很難理解。 [1]:http://www.javaconcurrencyinpractice.com – Stefan 2011-05-20 23:00:25