Drew是對的,我認爲ConcurrentQueue即使聽起來很完美,但實際上是BlockingCollection使用的底層數據結構。對我來說,似乎也非常重要。 查看本書的第7章* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 它將解釋如何使用BlockingCollection,並讓多個生產者和多個消費者各自脫離「隊列」。你會想看看「GetConsumingEnumerable()」方法,可能只是調用.ToObservable()。
*本書的其餘部分相當平均。
編輯:
這裏是一個示例程序,我認爲做你想要的?
class Program
{
private static ManualResetEvent _mre = new ManualResetEvent(false);
static void Main(string[] args)
{
var theQueue = new BlockingCollection<string>();
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));
LoadQueue(theQueue, "Producer A");
LoadQueue(theQueue, "Producer B");
LoadQueue(theQueue, "Producer C");
_mre.Set();
Console.WriteLine("Processing now....");
Console.ReadLine();
}
private static void ProcessNewValue(string value, string consumerName, int delay)
{
Thread.SpinWait(delay);
Console.WriteLine("{1} consuming {0}", value, consumerName);
}
private static void LoadQueue(BlockingCollection<string> target, string prefix)
{
var thread = new Thread(() =>
{
_mre.WaitOne();
for (int i = 0; i < 100; i++)
{
target.Add(string.Format("{0} {1}", prefix, i));
}
});
thread.Start();
}
}
您能詳細說明您期望Rx如何幫助您嗎? – 2010-11-10 08:28:25
@Richard Szalay - 正如我在接近尾聲時所提到的,我的想法是,我不必輪詢查看是否有任何東西在隊列中,我可以在有東西放在那裏時做出反應,所以如果有大量項目突然推入,我可能有幾個線程正在處理。我試圖避免投票,這就是我現在正在做的事情。 – 2010-11-10 13:22:01