2013-03-29 55 views
0

我想有一種隊列,其中一個源輸入數據,而另一邊會有消費者等待,當他們檢測到隊列不爲空時開始執行數據直到它們停止。但是重要的是,如果隊列被清空,他們仍然會繼續觀看隊列,以便如果有更多數據彈出,他們將能夠使用它。我發現的multiple consumer and multiple producers作爲消費者嵌套在生產者,在我的情況下,我不能這樣做,因爲我將有一個單一的來源和消費者承諾排隊,直到我阻止他們。因此不是串聯的,而是消費者和生產者並行執行。單源生產者並行工作的多個併發者

將xecutig消費者和

Parallel.Invoke(() => producer(),() => consumers()); 

的問題,因此是我怎麼會執行隊列這有時並行

+3

有你看着'BlockingCollection '?你可以創建一個正確的類型,並將它傳遞給'producer()'和'consumers()'。 –

+0

@MthetheWWatson將會做 –

+0

您需要閱讀文檔,但是您將從每個消費者線程調用'GetConsumingEnumerable()',並將其與'foreach'一起使用。生產者線程會將東西添加到集合中,並在完成生產時調用'CompleteAdding()'。這將自動使所有消費者線程退出他們的foreach循環。 –

回答

0

空的內容並行生產者可以解決這個問題相對容易使用BlockingCollection<T>

您可以使用一個作爲隊列,並將其引用傳遞給producer()和每個consumers()

您將從每個消費者線程調用GetConsumingEnumerable(),並使用它與foreach

生產者線程會將項目添加到集合中,並在完成生產任務時調用CompleteAdding()。這將自動使所有消費者線程退出他們的foreach循環。

這裏是一個基本的例子(沒有錯誤處理)。對Thread.Sleep()的調用是模擬負載,不應以實際代碼使用。

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    internal class Program 
    { 
     private static void Main(string[] args) 
     { 
      ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code. 
      var plant = new ProcessingPlant(); 
      plant.Process(); 
      Console.WriteLine("Work complete."); 
     } 
    } 

    public sealed class ProcessingPlant 
    { 
     private readonly BlockingCollection<string> _queue = new BlockingCollection<string>(); 

     public void Process() 
     { 
      Parallel.Invoke(producer, consumers); 
     } 

     private void producer() 
     { 
      for (int i = 0; i < 100; ++i) 
      { 
       string item = i.ToString(); 
       Console.WriteLine("Producer is queueing {0}", item); 
       _queue.Add(item); // <- Here's where we add an item to the queue. 
       Thread.Sleep(0); 
      } 

      _queue.CompleteAdding(); // <- Here's where we make all the consumers 
     }       // exit their foreach loops. 

     private void consumers() 
     { 
      Parallel.Invoke(
       () => consumer(1), 
       () => consumer(2), 
       () => consumer(3), 
       () => consumer(4), 
       () => consumer(5) 
      ); 
     } 

     private void consumer(int id) 
     { 
      Console.WriteLine("Consumer {0} is starting.", id); 

      foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items. 
      { 
       Console.WriteLine("Consumer {0} read {1}", id, item); 
       Thread.Sleep(0); 
      } 

      Console.WriteLine("Consumer {0} is stopping.", id); 
     } 
    } 
} 

(我知道這是使用額外的線程剛開始消費者,但我就是這麼做的,以避免混淆真正的問題 - 這是演示如何使用BlockingCollection的)

相關問題