空的內容並行生產者可以解決這個問題相對容易使用BlockingCollection<T>
。
您可以使用一個作爲隊列,並將其引用傳遞給producer()
和每個consumers()
。
您將從每個消費者線程調用GetConsumingEnumerable()
,並使用它與foreach
。
生產者線程會將項目添加到集合中,並在完成生產任務時調用CompleteAdding()
。這將自動使所有消費者線程退出他們的foreach循環。
這裏是一個基本的例子(沒有錯誤處理)。對Thread.Sleep()
的調用是模擬負載,不應以實際代碼使用。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static void Main(string[] args)
{
ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
var plant = new ProcessingPlant();
plant.Process();
Console.WriteLine("Work complete.");
}
}
public sealed class ProcessingPlant
{
private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();
public void Process()
{
Parallel.Invoke(producer, consumers);
}
private void producer()
{
for (int i = 0; i < 100; ++i)
{
string item = i.ToString();
Console.WriteLine("Producer is queueing {0}", item);
_queue.Add(item); // <- Here's where we add an item to the queue.
Thread.Sleep(0);
}
_queue.CompleteAdding(); // <- Here's where we make all the consumers
} // exit their foreach loops.
private void consumers()
{
Parallel.Invoke(
() => consumer(1),
() => consumer(2),
() => consumer(3),
() => consumer(4),
() => consumer(5)
);
}
private void consumer(int id)
{
Console.WriteLine("Consumer {0} is starting.", id);
foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
{
Console.WriteLine("Consumer {0} read {1}", id, item);
Thread.Sleep(0);
}
Console.WriteLine("Consumer {0} is stopping.", id);
}
}
}
(我知道這是使用額外的線程剛開始消費者,但我就是這麼做的,以避免混淆真正的問題 - 這是演示如何使用BlockingCollection的)
有你看着'BlockingCollection'?你可以創建一個正確的類型,並將它傳遞給'producer()'和'consumers()'。 –
@MthetheWWatson將會做 –
您需要閱讀文檔,但是您將從每個消費者線程調用'GetConsumingEnumerable()',並將其與'foreach'一起使用。生產者線程會將東西添加到集合中,並在完成生產時調用'CompleteAdding()'。這將自動使所有消費者線程退出他們的foreach循環。 –