2010-06-13 12 views
5

我意識到,當我嘗試使用多個線程處理併發隊列中的項目時,多個線程可以將項目放入其中,理想的解決方案是使用Reactive Extensions與併發數據結構。如何在ConcurrentQueue或ConcurrentStack中使用IObservable/IObserver

我原來的問題是:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

所以我很好奇,如果有什麼辦法可以有作爲的項目投入它會連續出列一個LINQ(PLINQ或)查詢。

我試圖讓這個工作的方式,我可以有n個生產者推入隊列和有限數量的線程來處理,所以我不會超載數據庫。

如果我可以使用Rx框架,那麼我希望我可以開始它,並且如果100個項目被放置在100ms內,那麼作爲PLINQ查詢的一部分的20個線程將僅通過隊列進行處理。

有三種技術,我想一起工作:

  1. 的Rx框架(無LINQ)
  2. PLING
  3. System.Collections.Concurrent 結構
+0

您能詳細說明您期望Rx如何幫助您嗎? – 2010-11-10 08:28:25

+0

@Richard Szalay - 正如我在接近尾聲時所提到的,我的想法是,我不必輪詢查看是否有任何東西在隊列中,我可以在有東西放在那裏時做出反應,所以如果有大量項目突然推入,我可能有幾個線程正在處理。我試圖避免投票,這就是我現在正在做的事情。 – 2010-11-10 13:22:01

回答

3

我不不知道如何用Rx完成這項工作,但我建議只使用BlockingCollection<T>producer-consumer pattern。您的主線程將項目添加到集合中,默認情況下,該集合使用下面的ConcurrentQueue<T>。然後,您有一個單獨的Task,您在使用Parallel::ForEach而不是BlockingCollection<T>之前旋轉起來,以便從系列中同時處理多個項目,以便同時處理系統。現在,您可能還需要考慮使用ParallelExtensions庫的GetConsumingPartitioner方法,以便提高效率,因爲在這種情況下,默認分區程序會創建比您想要的更多的開銷。你可以從this blog post瞭解更多。

當主線程完成你的BlockingCollection<T>Task::WaitCompleteAddingTask你紡最多等待所有消費者處理完集合中的所有項目。

+0

使用'BlockingCollection'的主要方法是消耗線程塊。一個可觀察的模式只會在需要處理的時候佔用線程。 – 2014-03-21 18:49:32

6

Drew是對的,我認爲ConcurrentQueue即使聽起來很完美,但實際上是BlockingCollection使用的底層數據結構。對我來說,似乎也非常重要。 查看本書的第7章* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 它將解釋如何使用BlockingCollection,並讓多個生產者和多個消費者各自脫離「隊列」。你會想看看「GetConsumingEnumerable()」方法,可能只是調用.ToObservable()。

*本書的其餘部分相當平均。

編輯:

這裏是一個示例程序,我認爲做你想要的?

class Program 
{ 
    private static ManualResetEvent _mre = new ManualResetEvent(false); 
    static void Main(string[] args) 
    { 
     var theQueue = new BlockingCollection<string>(); 
     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000)); 


     LoadQueue(theQueue, "Producer A"); 
     LoadQueue(theQueue, "Producer B"); 
     LoadQueue(theQueue, "Producer C"); 

     _mre.Set(); 

     Console.WriteLine("Processing now...."); 

     Console.ReadLine(); 
    } 

    private static void ProcessNewValue(string value, string consumerName, int delay) 
    { 
     Thread.SpinWait(delay); 
     Console.WriteLine("{1} consuming {0}", value, consumerName); 
    } 

    private static void LoadQueue(BlockingCollection<string> target, string prefix) 
    { 
     var thread = new Thread(() => 
            { 
             _mre.WaitOne(); 
             for (int i = 0; i < 100; i++) 
             { 
              target.Add(string.Format("{0} {1}", prefix, i)); 
             } 
            }); 
     thread.Start(); 
    } 
} 
+0

這實際上是......巧妙的人......連接Rx和BlockingCollection。哇...你甚至可以用這個東西做一個管道:https://msdn.microsoft.com/en-us/library/ff963548.aspx – Oooogi 2017-02-07 09:29:41