如何在兩個轉換塊完成時重新編寫代碼完成的代碼?我認爲完成意味着它被標記爲完成並且「外出隊列」是空的?TPL數據流,只有當所有源數據塊完成時才能保證完成
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
我編輯了代碼,爲每個變換塊添加一個輸入緩衝區計數。顯然,所有100個項目都被傳輸到每個轉換塊。但只要其中一個轉換塊完成,處理器塊就不會再接受任何項目,而是不完整轉換塊的輸入緩衝區只是刷新輸入緩衝區。
請注意,變換塊可能未收到來自廣播塊的所有消息。他們只收到_latest_消息。如果廣播塊提供的消息比變換塊可以接收它們的速度快,則變換塊將丟失消息。另外,如果你想確保消息順序等,你應該在SendAsync(i)'上等待'。 – urbanhusky