8

如果你需要運行多個並行異步I/O任務,但需要確保不超過X I/O進程在同時運行;並且預處理和後處理I/O處理任務不應具有這樣的限制。如何正確運行多個異步任務並行?

這裏有一個場景 - 假設有1000個任務;它們每個都接受一個文本字符串作爲輸入參數;轉換文本(預處理I/O)然後將轉換後的文本寫入文件。其目標是使預處理邏輯利用100%的CPU /內核和I/O部分以最大10度並行度運行的任務(同時打開最多10個用於寫入文件的任務)。

你能提供一個示例代碼如何用C#做/ .NET 4.5?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx

+0

的Rx 2.0可能是一個非常適合這種(一次節流第二階段到10),但我不熟悉不夠與它肯定地說。 : -/ –

回答

7

我認爲使用TPL數據流的,這將是一個好主意:在創建無界的並行性,以有限的並行文件寫入塊預處理和後處理塊,並將它們連接在一起。喜歡的東西:

var unboundedParallelismOptions = 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
    }; 

var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions); 

var writeToFileBlock = new TransformBlock<string, string>(
    async s => 
      { 
       await WriteToFile(s); 
       return s; 
      }, 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); 

var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions); 

var propagateCompletionOptions = 
    new DataflowLinkOptions { PropagateCompletion = true }; 

preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions); 
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions); 

// use something like await preProcessBlock.SendAsync("text") here 

preProcessBlock.Complete(); 
await postProcessBlock.Completion; 

WriteToFile()看起來是這樣的:

private static async Task WriteToFile(string s) 
{ 
    using (var writer = new StreamWriter(GetFileName())) 
     await writer.WriteAsync(s); 
} 
+0

+1這很有趣..謝謝! –

+0

這裏有什麼'PreProcess'和'PostProcess'方法? – shashwat

+0

@shashwat他們做任何需要的。原來的問題是關於「I/O處理前後的任務」,所以我用這種方法來表示。 – svick

1

這聽起來像你要考慮Djikstra信號量來控制訪問任務的起點。

然而,這聽起來像一個典型的隊列/固定數量的消費者的類型的問題,這可能是更合適的方式來構造它。