我試圖圍繞BlockingCollection和我的生產者/消費者問題圍繞我的頭。生產者/消費者,BlockingCollection,並正在等待更改
我想達到的目標,如下:
- 各種各樣的線程安全隊列在FIFO方式持有對象的列表(「工作」)。
- 第二個線程安全隊列,它也以FIFO方式保存這些作業的結果列表。
換句話說:
Inbound "Job" Data, can come at any time from multiple threads
==> Thread-Safe FIFO Queue 1 "FQ1"
==> Async Processing of data in FQ1 (and remove item from FQ1)
==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
==> Async Processing of data in FQ2 (and remove item from FQ2)
==> Done
我卑微的嘗試至今都:
private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;
(...)
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}
一個我選擇BlockingCollection是因爲我想保持負荷降到最低的原因,意義只有當物品實際處於集合內部時才工作(而不處理等待/睡眠)。我不確定foreach是否正確。
請讓我知道這是否正確,或者如果有更好的方法。謝謝!
編輯 我可以從單元測試中知道任務內部的工作實際上是同步的。新版本如下:
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
Task.Factory.StartNew(async() => { fq2.Add(await a.DoWork()); });
}
輸入非常感謝!
如果.net4.5是一個選項,你無法比看TPL DataFlow做得更好。從BufferBlock(一個線程安全的異步隊列)開始並向外工作。 http://www.microsoft.com/en-us/download/details.aspx?id=14782 – spender