2015-12-21 82 views
3

我正在尋找一種「整齊」且高效的方式來實現長步驟1(可能是並行)和後面的步驟2的組合按照原始順序(並且如果可能的話,儘量減少RAM中第一步的數據量),同時允許第一步對象的步驟1中的數據可用時啓動第二步,另外還有步驟2以獲得更多數據。我需要壓縮大量的圖像(慢 - 步驟1),然後通過網絡連接按順序發送(步驟2)。爲了使它更加清晰,我需要壓縮大量圖像(慢 - 步驟1)。在任何階段限制準備好的壓縮數據在RAM中的塊數量也很重要,例如,如果發送1000張圖像,我想限制「已完成」但未發送圖像的數量爲(比如)線程數/處理器使用。`Parallel.ForEach`按照定義的順序最後一步

我已經做了一個「手寫」版本,使用任務對象數組,但它看起來很雜亂,我確信其他人必須有類似的需求,所以有更多的「標準」這樣做的方式?理想情況下,我希望Parallel.ForEach有兩個代表的變體 - 一個代表步驟1,另一個代表步驟2,我希望其中一個標準覆蓋例如包含「localFinal」參數的標準覆蓋可能會有所幫助,但在事實證明,最後階段是「每個線程」,而不是「每個代表」。

任何人都可以用現有的整潔方式指向我來實現這個目標嗎?

+2

似乎PLINQs AsOrdered和AsSequential是爲你設計的 - 你認爲PLINQ嗎? –

+0

我在研究時確實看到它出現在討論中,但我看不到任何機制來約束處理但未發送數據的中間隊列的大小 - 所以在我的情況下,如果壓縮速度非常快,網絡速度很慢,那麼就沒有什麼可以告訴PLINQ在步驟1上回踩。也許我錯過了一個微妙的選擇(特別是因爲PLINQ對我來說是非常新的!) – medconn

回答

1

您可以結合使用Plinq(與WithDegreeOfParallelism()限制第一階段的併發性)以及BlockingCollection完成的塊。另請注意,它使用AsOrdered()來保留原始順序。

以下示例演示。對於您的實際應用程序,您需要將此處顯示的int工作項目替換爲您的工作項目類型 - 文件名或類別以及與每個工作項目有關的信息。

using System; 
using System.Collections.Concurrent; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    static class Program 
    { 
     static void Main() 
     { 
      int maxThreads = 4; 
      int maxOutputQueueSize = 10; 
      var workItems = Enumerable.Range(1, 100); // Pretend these are your files 
      var outputQueue = new BlockingCollection<int>(maxOutputQueueSize); 
      var worker = Task.Run(() => output(outputQueue)); 

      var parallelWorkItems = 
       workItems 
       .AsParallel() 
       .AsOrdered() 
       .WithDegreeOfParallelism(maxThreads) 
       .WithMergeOptions(ParallelMergeOptions.NotBuffered) 
       .Select(process); 

      foreach (var item in parallelWorkItems) 
       outputQueue.Add(item); 

      outputQueue.CompleteAdding(); 
      worker.Wait(); 

      Console.WriteLine("Done."); 
     } 

     static int process(int value) // Pretend that this compresses the data. 
     { 
      Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}"); 
      Thread.Sleep(250); // Simulate slow operation. 
      return value; // Return updated work item. 
     } 

     static void output(BlockingCollection<int> queue) 
     { 
      foreach (var item in queue.GetConsumingEnumerable()) 
       Console.WriteLine($"Output is processing {item}"); 

      Console.WriteLine("Finished outputting."); 
     } 
    } 
} 

注意如何可以限制兩個輸入隊列處理(經由WithDegreeOfParallelism)和輸出隊列的大小(與maxOutputQueueSize參數)。

另外,如果你使用的是.Net 4.5或更高版本,你可以看看TPL Dataflow library,它對這種事情有很多支持。如果可以的話,我會推薦使用它 - 但在這裏的答案中描述太多了。

相關問題