2017-04-24 24 views
3

我正在創建一個使用TPL DataFlow的任務處理器。我將遵循生產者消費者模型,在生產者生產一些項目的時候需要處理一段時間,消費者不斷等待新項目到達。這裏是我的代碼:如何使用異常處理創建永無止境的DataFlow Mesh?

async Task Main() 
{ 
    var runner = new Runner(); 
    CancellationTokenSource cts = new CancellationTokenSource(); 
    Task runnerTask = runner.ExecuteAsync(cts.Token); 

    await Task.WhenAll(runnerTask); 
} 

public class Runner 
{ 
    public async Task ExecuteAsync(CancellationToken cancellationToken) { 
     var random = new Random(); 

     ActionMeshProcessor processor = new ActionMeshProcessor(); 
     await processor.Init(cancellationToken); 

     while (!cancellationToken.IsCancellationRequested) 
     { 
      await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more 

      int[] items = GetItems(random.Next(3, 7)); 

      await processor.ProcessBlockAsync(items); 
     } 
    } 

    private int[] GetItems(int count) 
    { 
     Random randNum = new Random(); 

     int[] arr = new int[count]; 
     for (int i = 0; i < count; i++) 
     { 
      arr[i] = randNum.Next(10, 20); 
     } 

     return arr; 
    } 
} 

public class ActionMeshProcessor 
{ 
    private TransformBlock<int, int> Transformer { get; set; } 
    private ActionBlock<int> CompletionAnnouncer { get; set; } 

    public async Task Init(CancellationToken cancellationToken) 
    { 
     var options = new ExecutionDataflowBlockOptions 
     { 
      CancellationToken = cancellationToken, 
      MaxDegreeOfParallelism = 5, 
      BoundedCapacity = 5 
     }; 


     this.Transformer = new TransformBlock<int, int>(async input => { 

      await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here! 

      if (input > 15) 
      { 
       throw new Exception($"I can't handle this number: {input}"); 
      } 

      return input + 1; 
     }, options); 

     this.CompletionAnnouncer = new ActionBlock<int>(async input => 
     { 
      Console.WriteLine($"Completed: {input}"); 

      await Task.FromResult(0); 
     }, options); 

     this.Transformer.LinkTo(this.CompletionAnnouncer); 

     await Task.FromResult(0); // what do I await here? 
    } 

    public async Task ProcessBlockAsync(int[] arr) 
    { 
     foreach (var item in arr) 
     { 
      await this.Transformer.SendAsync(item); // await if there are no free slots 
     }  
    } 
} 

我在上面添加了一個條件檢查來引發異常來模仿一個特例。

這裏是我的問題:

  • 什麼是我可以處理上述網狀異常不使整個網下的最佳方法是什麼?

  • 有沒有更好的方法來初始化/開始/繼續永不結束的DataFlow網格?

  • 我在哪裏等待完成?

我特地到this similar question

回答

3

例外

沒有什麼異步你init它可能是一個標準的同步構造。您可以在網格中處理異常,而無需通過在提供給塊的lamda中進行簡單的嘗試抓取網格。然後,您可以通過從網格中篩選結果或忽略以下塊中的結果來處理該情況。以下是一個過濾的例子。對於int的簡單情況,您可以使用int?並篩選出任何值爲null的值,或者您可以根據需要設置任意類型的魔術指標值。如果實際傳遞的是引用類型,則可以將null推出或將數據項標記爲可以由鏈接上的謂詞檢查的方式。

public class ActionMeshProcessor { 
    private TransformBlock<int, int?> Transformer { get; set; } 
    private ActionBlock<int?> CompletionAnnouncer { get; set; } 

    public ActionMeshProcessor(CancellationToken cancellationToken) { 
     var options = new ExecutionDataflowBlockOptions { 
      CancellationToken = cancellationToken, 
      MaxDegreeOfParallelism = 5, 
      BoundedCapacity = 5 
     }; 


     this.Transformer = new TransformBlock<int, int?>(async input => { 
      try { 
       await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here! 

       if (input > 15) { 
        throw new Exception($"I can't handle this number: {input}"); 
       } 

       return input + 1; 
      } catch (Exception ex) { 
       return null; 
      } 

     }, options); 

     this.CompletionAnnouncer = new ActionBlock<int?>(async input => 
     { 
      if (input == null) throw new ArgumentNullException("input"); 

      Console.WriteLine($"Completed: {input}"); 

      await Task.FromResult(0); 
     }, options); 

     //Filtering 
     this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null); 
     this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>()); 
    } 

    public async Task ProcessBlockAsync(int[] arr) { 
     foreach (var item in arr) { 
      await this.Transformer.SendAsync(item); // await if there are no free slots 
     } 
    } 
} 

完成

可以公開從處理器Complete()Completion並使用它們來await完成時,你的應用程序shutsdown,假設這是你唯一的一次會關閉網格。另外,請確保通過適當的鏈接傳播完成。

//Filtering 
    this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null); 
    this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>()); 
}   

public void Complete() { 
    Transformer.Complete(); 
} 

public Task Completion { 
    get { return CompletionAnnouncer.Completion; } 
} 

然後,根據您的樣品完成的最可能的地方是外循環駕駛你的處理:

public async Task ExecuteAsync(CancellationToken cancellationToken) { 
    var random = new Random(); 

    ActionMeshProcessor processor = new ActionMeshProcessor(); 
    await processor.Init(cancellationToken); 

    while (!cancellationToken.IsCancellationRequested) { 
     await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more 

     int[] items = GetItems(random.Next(3, 7)); 

     await processor.ProcessBlockAsync(items); 
    } 
    //asuming you don't intend to throw from cancellation 
    processor.Complete(); 
    await processor.Completion(); 

} 
+0

這看起來不錯,謝謝@JSteward,我將通過調試,看看我擊中任何其他邊緣情況並更新! – Amit