我試圖將一條消息的副本從ActionBlock<int>
發送給多個使用者,這些使用者也是ActionBlock<int>
。這很好,但是如果其中一個目標塊引發異常,看起來這不會傳播到源塊。在這裏,我怎麼嘗試處理異常,但它從未進入到catch
部分:TPL DataFlow無法處理ActionBlock中的異常
static void Main(string[] args)
{
var t1 = new ActionBlock<int>(async i =>
{
await Task.Delay(2000);
Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t2 = new ActionBlock<int>(async i =>
{
await Task.Delay(1000);
Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t3 = new ActionBlock<int>(async i =>
{
await Task.Delay(100);
Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
if (i > 5)
throw new Exception("Too big number");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var targets = new [] { t1, t2, t3};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
await t.SendAsync(item);
}
catch
{
Trace.TraceInformation("handled in select"); // never goes here
}
});
try
{
await Task.WhenAll(processingTasks);
}
catch
{
Trace.TraceInformation("handled"); // never goes here
}
});
for (var i = 1; i <= 10; i++)
broadcaster.Post(i);
}
我不知道什麼,我在這裏失蹤,但我希望能夠以檢索異常和目標塊已經發生了故障。
你只從'SendAsync''等待'Task',它只表示該項目是否被目標接受。如果任何一個目標拋出異常將被附加到該目標的「完成」任務的異常。爲了觀察這個異常,你需要「等待」那個任務,即「等待t3.Completion」。 – JSteward
一個簡單的解決方案可能是用'if(!await t.SendAsync(item))'替代'await t.SendAsync(item);'等待t.Completion;'這會將異常傳播到最內層'try/catch'。然後您可以再次拋出或將信息添加到新的例外中,例如哪個塊發生故障。然後你需要處理錯誤的「廣播電臺」,但你明白了。 – JSteward
@JSteward謝謝!我用'if(!await t.SendAsync(item))'代替t.Completion;'現在一切正常。發佈它作爲答案,以便我可以接受它。 –