1

我試圖圍繞BlockingCollection和我的生產者/消費者問題圍繞我的頭。生產者/消費者,BlockingCollection,並正在等待更改

我想達到的目標,如下:

  • 各種各樣的線程安全隊列在FIFO方式持有對象的列表(「工作」)。
  • 第二個線程安全隊列,它也以FIFO方式保存這些作業的結果列表。

換句話說:

Inbound "Job" Data, can come at any time from multiple threads 
    ==> Thread-Safe FIFO Queue 1 "FQ1" 
     ==> Async Processing of data in FQ1 (and remove item from FQ1) 
     ==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2" 
      ==> Async Processing of data in FQ2 (and remove item from FQ2) 
       ==> Done 

我卑微的嘗試至今都:

private BlockingCollection<InboundObject> fq1; 
private BlockingCollection<ResultObject> fq2; 

(...) 

Task.Factory.StartNew(() => 
{ 
    foreach (InboundObject a in fq1.GetConsumingEnumerable()) 
     a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject> 
} 

一個我選擇BlockingCollection是因爲我想保持負荷降到最低的原因,意義只有當物品實際處於集合內部時才工作(而不處理等待/睡眠)。我不確定foreach是否正確。

請讓我知道這是否正確,或者如果有更好的方法。謝謝!

編輯 我可以從單元測試中知道任務內部的工作實際上是同步的。新版本如下:

Task.Factory.StartNew(() => 
{ 
    foreach (InboundObject a in fq1.GetConsumingEnumerable()) 
     Task.Factory.StartNew(async() => { fq2.Add(await a.DoWork()); }); 
} 

輸入非常感謝!

+2

如果.net4.5是一個選項,你無法比看TPL DataFlow做得更好。從BufferBlock(一個線程安全的異步隊列)開始並向外工作。 http://www.microsoft.com/en-us/download/details.aspx?id=14782 – spender

回答

1

我選擇BlockingCollection的原因之一是因爲我希望將負載保持在最低限度,這意味着只有在物品實際位於集合內時才能工作(而不處理等待/睡眠)。我不確定foreach是否正確。

這是正確的方法,foreach將被阻塞,直到新的項目將被添加到隊列或CompleteAdding方法將被調用。不正確的是你想用BlockingCollection實現異步處理。 BlockingCollection是一個簡單的生產者/消費者隊列,必須在需要維護作業和作業結果處理的順序時使用。因爲它是同步的。工作將按照添加的順序進行處理。

如果您只需要異步執行,則不需要隊列。在這種情況下,您可以使用TPL,爲每個作業創建一個新任務,他們將在內部由TPL排隊,並將使用盡可能多的OS線程,因爲系統可以高效地處理這些線程。例如,你的工作可以產生他們自己的任務。這是更靈活的方法。

另外,可以使用生產者/消費者隊列來組織管道作業的執行。在這種情況下,工作必須分成幾個步驟。每個步驟都必須由專用線程執行。在每個作業步驟線程中,我們必須從一個隊列中讀取作業,執行此作業,並將其排入下一個隊列。

interface IJob 
{ 
    void Step1(); 
    void Step2(); 
    ... 
} 

var step1 = new BlockingCollection<IJob>(); 
var step2 = new BlockingCollection<IJob>(); 
... 

Task.Factory.StartNew(() => 
    { 
     foreach(var step in step1.GetConsumingEnumerable()) { 
      step.Step1(); 
      step2.Add(step); 
     } 
    }); 

Task.Factory.StartNew(() => 
    { 
     foreach(var step in step2.GetConsumingEnumerable()) { 
      // while performing Step2, another thread can execute Step1 
      // of the next job 
      step.Step2(); 
      step3.Add(step); 
     } 
    }); 

在這種情況下,作業將按先進先出順序執行,但並行執行。 但是如果你想做流水線處理,你首先要考慮負載均衡。如果其中一個步驟花費太多時間,則隊列將變大,而其他線程大部分時間都將空閒。