2017-09-06 83 views
1

我試圖將一條消息的副本從ActionBlock<int>發送給多個使用者,這些使用者也是ActionBlock<int>。這很好,但是如果其中一個目標塊引發異常,看起來這不會傳播到源塊。在這裏,我怎麼嘗試處理異常,但它從未進入到catch部分:TPL DataFlow無法處理ActionBlock中的異常

static void Main(string[] args) 
{ 
    var t1 = new ActionBlock<int>(async i => 
    { 
     await Task.Delay(2000); 
     Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}"); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 }); 

    var t2 = new ActionBlock<int>(async i => 
    { 
     await Task.Delay(1000); 
     Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}"); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 }); 

    var t3 = new ActionBlock<int>(async i => 
    { 
     await Task.Delay(100); 
     Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}"); 
     if (i > 5) 
      throw new Exception("Too big number"); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 }); 

    var targets = new [] { t1, t2, t3}; 

    var broadcaster = new ActionBlock<int>(
     async item => 
     { 
      var processingTasks = targets.Select(async t => 
      { 
       try 
       { 
        await t.SendAsync(item); 
       } 
       catch 
       { 
        Trace.TraceInformation("handled in select"); // never goes here 
       } 
      }); 

      try 
      { 
       await Task.WhenAll(processingTasks); 
      } 
      catch 
      { 
       Trace.TraceInformation("handled"); // never goes here 
      } 
     }); 

    for (var i = 1; i <= 10; i++) 
     broadcaster.Post(i); 
} 

我不知道什麼,我在這裏失蹤,但我希望能夠以檢索異常和目標塊已經發生了故障。

+0

你只從'SendAsync''等待'Task',它只表示該項目是否被目標接受。如果任何一個目標拋出異常將被附加到該目標的「完成」任務的異常。爲了觀察這個異常,你需要「等待」那個任務,即「等待t3.Completion」。 – JSteward

+0

一個簡單的解決方案可能是用'if(!await t.SendAsync(item))'替代'await t.SendAsync(item);'等待t.Completion;'這會將異常傳播到最內層'try/catch'。然後您可以再次拋出或將信息添加到新的例外中,例如哪個塊發生故障。然後你需要處理錯誤的「廣播電臺」,但你明白了。 – JSteward

+0

@JSteward謝謝!我用'if(!await t.SendAsync(item))'代替t.Completion;'現在一切正常。發佈它作爲答案,以便我可以接受它。 –

回答

1

如果一個塊進入故障狀態,它將不再接受新的項目,並且它投擲的Exception將被附加到它的Completion任務和/或如果在管道中被鏈接完成傳播它。若要觀察Exception,您可以await完成,如果該塊拒絕更多的項目。

var processingTasks = targets.Select(async t => 
{ 
    try 
    { 
     if(!await t.SendAsync(item)) 
      await t.Completion; 
    } 
    catch 
    { 
     Trace.TraceInformation("handled in select"); // never goes here 
    } 
}); 
+0

雖然有一個問題。在我看來,即使't.SendAsync(item)'發送消息以便t之後的下一個塊按順序接收它們。我雖然DataFlow waranted消息處理的順序? –

+0

您的每個目標都將按照發送到您的「廣播公司」的順序接收消息。如果最多隻有一個目標失敗,廣播公司將被拒絕並拒絕新消息。使用默認塊選項「等待Task.WhenAll」可確保所有目標在處理下一個目標之前已接受單個消息。你能否提供一個不按順序顯示消息的例子或用例? – JSteward

+0

這是我的錯。我以'ActionBlock '的身份作爲參數傳遞爲'Action '而不是'Func ' –