2014-02-11 69 views
5

是否有辦法批量阻塞集合中的項目集合。 E.G.BlockingCollection <T>使用TPL進行批處理DataFlow

我有一個消息總線出版商調用 blockingCollection.Add()

並且是這樣創造了一個消費線程:

Task.Factory.StartNew(() => 
     { 
      foreach (string value in blockingCollection.GetConsumingEnumerable()) 
       { 
        Console.WriteLine(value); 
       } 
     }); 

但是,我只希望控制檯阻塞後寫集合上有10個項目,而GetConsumingEnumerable()總是在添加每個項目後觸發。我可以爲此編寫自己的隊列,但如果可能,我想使用阻塞集合?

+0

Just append Take(10) –

+2

@HansPassant:但是這會導致超過10日的物品不能被處理......? – Jon

回答

3

不知道是什麼項目要求是,但我建議TPL DataFlow BatchBlock

您將實例化一個BatchBlock<string>,將其綁定到ActionBlock<string>然後發佈到批處理塊。

的僞代碼可能是這個樣子:

var bb = new BatchBlock<string>(10); 
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
     Console.Writeline(msg); 
}); 

bb.LinkTo(ab); 

foreach (string value in blockingCollection.GetConsumingEnumerable()) 
{ 
     bb.Post(value); 
} 

使用數據流,你可能甚至想用BufferBlock更換BlockingCollection或只是張貼到緩衝塊的情況下直接先加入到阻止收集,因爲批塊已經是線程安全的。

+0

謝謝德米特里..剛剛在bb.LinkTo(ab)的類型轉換問題在這裏。將BatchBlock 轉換爲ISourceBlock的問題 ... –

+0

我的不好。 ActionBlock 必須是ActionBlock ,因爲它從批處理塊接收到一個字符串數組。查看更新後的答案 – Dimitri

+1

這個標記正確,因爲我需要用TPL DataFlow學習新內容!謝謝德米特里。 –

4

一個快速的解決辦法是這樣的

public class ConsoleQueue 
{ 
    private readonly List<string> _values = new List<string>(); 

    public void FlushQueueIfFull() 
    { 
     if (_values.Count < 10) return; 
     foreach (var value in _values) 
     { 
      Console.WriteLine(value); 
     } 
     _values.Clear(); 
    } 

    public void Push(string message) 
    { 
     _values.Add(message); 
     FlushQueueIfFull(); 
    } 
} 

那麼你可以使用它像這樣

 var queue = new ConsoleQueue(); 

     Task.Factory.StartNew(() => 
     { 
      foreach (string value in blockingCollection.GetConsumingEnumerable()) 
      { 
       queue.Push(value); 
      } 
     }); 

您可以輕鬆地擴展它來掩蓋線程安全等

+1

+1 - 另一種選擇是使用DataFlow BatchBlock,它已經實現了線程安全批處理。 – Dimitri

+1

謝謝 - 這工作完美.. –