您可以結合使用Plinq(與WithDegreeOfParallelism()
限制第一階段的併發性)以及BlockingCollection完成的塊。另請注意,它使用AsOrdered()
來保留原始順序。
以下示例演示。對於您的實際應用程序,您需要將此處顯示的int
工作項目替換爲您的工作項目類型 - 文件名或類別以及與每個工作項目有關的信息。
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static void Main()
{
int maxThreads = 4;
int maxOutputQueueSize = 10;
var workItems = Enumerable.Range(1, 100); // Pretend these are your files
var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
var worker = Task.Run(() => output(outputQueue));
var parallelWorkItems =
workItems
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(maxThreads)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(process);
foreach (var item in parallelWorkItems)
outputQueue.Add(item);
outputQueue.CompleteAdding();
worker.Wait();
Console.WriteLine("Done.");
}
static int process(int value) // Pretend that this compresses the data.
{
Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
Thread.Sleep(250); // Simulate slow operation.
return value; // Return updated work item.
}
static void output(BlockingCollection<int> queue)
{
foreach (var item in queue.GetConsumingEnumerable())
Console.WriteLine($"Output is processing {item}");
Console.WriteLine("Finished outputting.");
}
}
}
注意如何可以限制兩個輸入隊列處理(經由WithDegreeOfParallelism
)和輸出隊列的大小(與maxOutputQueueSize
參數)。
另外,如果你使用的是.Net 4.5或更高版本,你可以看看TPL Dataflow library,它對這種事情有很多支持。如果可以的話,我會推薦使用它 - 但在這裏的答案中描述太多了。
似乎PLINQs AsOrdered和AsSequential是爲你設計的 - 你認爲PLINQ嗎? –
我在研究時確實看到它出現在討論中,但我看不到任何機制來約束處理但未發送數據的中間隊列的大小 - 所以在我的情況下,如果壓縮速度非常快,網絡速度很慢,那麼就沒有什麼可以告訴PLINQ在步驟1上回踩。也許我錯過了一個微妙的選擇(特別是因爲PLINQ對我來說是非常新的!) – medconn