2017-05-01 48 views
3

我有一個方法需要一個圖像的文件名和處理圖像(CPU密集型),然後將其上傳到blob存儲(異步IO)。這裏有一個方法概要:哪種平行異步方法最合適?

public async Task<ImageJob> ProcessImage(String fileName) { 

    Byte[] imageBytes = await ReadFileFromDisk(fileName).ConfigureAwait(false); // IO-bound 

    Byte[] processedImage = RunFancyAlgorithm(imageBytes); // CPU-bound 

    Uri blobUri = await this.azureBlobClient.UploadBlob(processedImage).ConfigureAwait(false); // IO-bound 

    return new ImageJob(blobUri); 
} 

我的程序的另一部分接收一個數千個文件名的列表進行處理。

調用我的ProcessImage方法的最合適的方法是什麼,以最大限度地利用可用的IO和CPU功率?

我已經確定了六種不同的方式(到目前爲止)調用我的方法 - 但我不確定這是最好的:

String[] fileNames = GetFileNames(); // typically contains thousands of filenames 

// Approach 1: 
{ 
    List<Task> tasks = fileNames 
     .Select(fileName => ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 

// Approach 2: 
{ 
    List<Task> tasks = fileNames 
     .Select(async fileName => await ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 

// Approach 3: 
{ 
    List<Task> tasks = new List<Task>(); 
    foreach(String fileName in fileNames) 
    { 
     Task imageTask = ProcessImage(fileName); 
     tasks.Add(imageTask); 
    } 

    await Task.WhenAll(tasks); 
} 

// Approach 4 (Weirdly, this gives me this warning: CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call." 
// ...even though I don't use an async lambda in the previous 3 examples, why is Parallel.ForEach so special? 
{ 
    ParallelLoopResult parallelResult = Parallel.ForEach(fileNames, fileName => ProcessImage(fileName)); 
} 

// Approach 5: 
{ 
    ParallelLoopResult parallelResult = Parallel.ForEach(fileNames, async fileName => await ProcessImage(fileName)); 
} 

// Approach 6: 
{ 
    List<Task> tasks = fileNames 
     .AsParallel() 
     .Select(fileName => ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 

// Approach 7: 
{ 
    List<Task> tasks = fileNames 
     .AsParallel() 
     .Select(async fileName => await ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 
+0

不斷拌勻異步/等待和和'Parallel.ForEach'他們是不兼容的,它會導致你產生一個'async void'函數。您需要使用[TPL替代](https://msdn.microsoft.com/en-us/library/dd460717(v = vs.110).aspx) –

+0

來獲取警告消息(方法4)。請參閱此處以獲取解釋:https://blogs.msdn.microsoft.com/ericlippert/2010/11/11/asynchrony-in-c-5-part-six-whither-async/。總之,'async'關鍵字啓用'await'關鍵字,這意味着如果你想使用'await'關鍵字,你的方法必須是'async',這並不意味着你不需要使用'await' if你的方法不是'async'。 –

+0

@KhanhTO我知道 - 但由於某種原因編譯器在方法4中提醒我,但它並未在方法1中提醒我,即使lambda是相同的。 – Dai

回答

3

這聽起來像你有需要要處理許多項目完全一樣的方式。由於@StephenCleary提到TPL Dataflow是偉大的問題的類型。一個很好的介紹可以發現here。開始是隻有幾個街區與你的主TransformBlock執行ProcessImage最簡單的方法這裏有一個簡單的例子,讓你開始:

public class ImageProcessor { 

    private TransformBlock<string, ImageJob> imageProcessor; 
    private ActionBlock<ImageJob> handleResults; 

    public ImageProcessor() { 
     var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 1000, 
      MaxDegreeOfParallelism = Environment.ProcessorCount 
     }; 
     imageProcessor = new TransformBlock<string, ImageJob>(fileName => ProcessImage(fileName), options); 
     handleResults = new ActionBlock<ImageJob>(job => HandleResults(job), options); 
     imageProcessor.LinkTo(handleResults, new DataflowLinkOptions() { PropagateCompletion = true });   
    } 

    public async Task RunData() { 
     var fileNames = GetFileNames(); 
     foreach (var fileName in fileNames) { 
      await imageProcessor.SendAsync(fileName); 
     } 
     //all data passed into pipeline 
     imageProcessor.Complete(); 
     await imageProcessor.Completion; 
    } 

    private async Task<ImageJob> ProcessImage(string fileName) { 
     //Each of these steps could also be separated into discrete blocks 

     var imageBytes = await ReadFileFromDisk(fileName).ConfigureAwait(false); // IO-bound 

     var processedImage = RunFancyAlgorithm(imageBytes); // CPU-bound 

     var blobUri = await this.azureBlobClient.UploadBlob(processedImage).ConfigureAwait(false); // IO-bound 

     return new ImageJob(blobUri); 
    } 

    private void HandleResults(ImageJob job) { 
     //do something with results 
    } 
} 
相關問題