2012-11-22 24 views
20

如何在兩個轉換塊完成時重新編寫代碼完成的代碼?我認爲完成意味着它被標記爲完成並且「外出隊列」是空的?TPL數據流,只有當所有源數據塊完成時才能保證完成

public Test() 
    { 
     broadCastBlock = new BroadcastBlock<int>(i => 
      { 
       return i; 
      }); 

     transformBlock1 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

     transformBlock2 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

     processorBlock = new ActionBlock<string>(i => 
      { 
       Console.WriteLine(i); 
      }); 

     //Linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public void Start() 
    { 
     const int numElements = 100; 

     for (int i = 1; i <= numElements; i++) 
     { 
      broadCastBlock.SendAsync(i); 
     } 

     //mark completion 
     broadCastBlock.Complete(); 

     processorBlock.Completion.Wait(); 

     Console.WriteLine("Finished"); 
     Console.ReadLine(); 
    } 
} 

我編輯了代碼,爲每個變換塊添加一個輸入緩衝區計數。顯然,所有100個項目都被傳輸到每個轉換塊。但只要其中一個轉換塊完成,處理器塊就不會再接受任何項目,而是不完整轉換塊的輸入緩衝區只是刷新輸入緩衝區。

+0

請注意,變換塊可能未收到來自廣播塊的所有消息。他們只收到_latest_消息。如果廣播塊提供的消息比變換塊可以接收它們的速度快,則變換塊將丟失消息。另外,如果你想確保消息順序等,你應該在SendAsync(i)'上等待'。 – urbanhusky

回答

24

問題正是casperOne中說,他的回答。一旦第一個變換塊完成,處理器塊進入「整理模式」:它將處理其輸入隊列中的剩餘項目,但不會接受任何新項目。

還有一個更簡單的修復不是在兩個分裂的處理器模塊,但:不設置PropagateCompletion,而是手動設置處理器塊完成當兩個變換塊完整:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion) 
    .ContinueWith(_ => processorBlock.Complete()); 
+0

正是我所期待的。沒有意識到Task.WhenAll會返回一個等待任務,我的疏忽。 –

+0

我需要非常相同的,也許它爲時已晚,但是你可以發佈有關我需要添加Task.WhenAll構造的更新嗎? –

+0

@AttilaHajdrik可能在你的數據流設置代碼的末尾,在你的'LinkTo'附近。 – svick

22

這裏的問題是,您每次撥打LinkTo method時都要設置PropagateCompletion property,以便在轉換塊中鏈接塊和等待時間的不同。

從爲Complete methodIDataflowBlock interface(重點煤礦)的文檔:

信號到IDataflowBlock它不應該接受,也不產生任何更多的消息也沒有消耗任何更多的被推遲的郵件

因爲你在每個TransformBlock<TInput, TOutput>實例錯開了您的等待時間,transformBlock2(等待20毫秒)被transformBlock1之前完成(等待50毫秒)。 transformBlock2先完成,然後發送信號到processorBlock,然後說「我不接受其他任何東西」(並且transformBlock1尚未產生其所有消息)。

請注意,transformBlock1之前的處理transformBlock1不是絕對保證;線程池(假設您使用默認調度程序)將以不同的順序處理任務(但很可能不會,因爲它會在20 ms項目完成後從隊列中竊取工作)是可行的。

你的管道是這樣的:

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      \   /
      processorBlock 

爲了解決這個問題,你想有一個管道,看起來像這樣:

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      |    | 
processorBlock1 processorBlock2 

這是由剛剛創建了兩個單獨完成ActionBlock<TInput>情況下,像這樣:

// The action, can be a method, makes it easier to share. 
Action<string> a = i => Console.WriteLine(i); 

// Create the processor blocks. 
processorBlock1 = new ActionBlock<string>(a); 
processorBlock2 = new ActionBlock<string>(a); 


// Linking 
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 

然後你需要等待兩個處理器塊,而不是隻有一個:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait(); 

一個非常這裏重要的注意事項;在創建ActionBlock<TInput>時,默認情況下將實例上的MaxDegreeOfParallelism property傳遞給它設置爲1。

這意味着您傳遞給ActionBlock<TInput>的呼叫Action<T> delegate是線程安全的,每次只能執行一個呼叫。

因爲您現在有兩個ActionBlock<TInput>實例指向相同的Action<T>委託,您不能保證線程安全。

如果你的方法是線程安全的,那麼你不必做任何事情(這將允許你將MaxDegreeOfParallelism屬性設置爲DataflowBlockOptions.Unbounded,因爲沒有理由阻止)。

如果它是不是線程安全,並且您需要保證它,您需要使用傳統的同步原語,如lock statement

在這種情況下,你會做它像這樣(儘管它顯然沒有必要,因爲在Console classWriteLine method是線程安全的):

// The lock. 
var l = new object(); 

// The action, can be a method, makes it easier to share. 
Action<string> a = i => { 
    // Ensure one call at a time. 
    lock (l) Console.WriteLine(i); 
}; 

// And so on... 
+0

謝謝你的冗長答案,但我選擇了svick的答案,因爲它直接適用於TPL Dataflow並提供了一個非常簡潔和簡單的解決方案。 –

+2

如果對兩個操作塊使用相同的['ExclusiveScheduler'](http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.concurrentexclusiveschedulerpair.exclusivescheduler),則可以輕鬆避免鎖定。 – svick

7

的除了svick的答案:爲了與使用PropagateCompletion選項獲得的行爲保持一致,還需要在前面的塊發生故障時轉發異常。象下面的擴展方法需要照顧的是還有:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) { 
    if (target == null) return; 
    if (sources.Length == 0) { target.Complete(); return; } 
    Task.Factory.ContinueWhenAll(
     sources.Select(b => b.Completion).ToArray(), 
     tasks => { 
      var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList(); 
      if (exceptions.Count != 0) { 
       target.Fault(new AggregateException(exceptions)); 
      } else { 
       target.Complete(); 
      } 
     } 
    ); 
} 
0

其他的答案是爲什麼PropagateCompletion =真誤事當塊有兩個以上的來源很清楚。

爲了提供一個簡單的解決方案,您可能需要查看一個開源庫DataflowEx,它可以通過內置的智能補全規則解決這類問題。 (它使用TPL數據流內部連接,但支持複雜完成傳播。實現看起來類同WhenAll還要處理動態鏈接添加,請檢查IMPL細節Dataflow.RegisterDependency()TaskEx.AwaitableWhenAll()。)

我稍微改變你的代碼,使一切工作使用DataflowEx:

public CompletionDemo1() 
{ 
    broadCaster = new BroadcastBlock<int>(
     i => 
      { 
       return i; 
      }).ToDataflow(); 

    transformBlock1 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

    transformBlock2 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock2.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

    processor = new ActionBlock<string>(
     i => 
      { 
       Console.WriteLine(i); 
      }).ToDataflow(); 

    /** rather than TPL linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    **/ 

    //Use DataflowEx linking 
    var transform1 = transformBlock1.ToDataflow(); 
    var transform2 = transformBlock2.ToDataflow(); 

    broadCaster.LinkTo(transform1); 
    broadCaster.LinkTo(transform2); 
    transform1.LinkTo(processor); 
    transform2.LinkTo(processor); 
} 

的完整代碼here

聲明:我是DataflowEx的作者,它是在MIT許可下發布的。

+0

如果你在Gridsum工作,你可否請公開?我的問題明確提到我需要TPL Dataflow的答案,我不想爲這個問題使用第三方解決方案。謝謝。 –

+1

是的,我爲Gridsum工作。但圖書館是完全免費和開源的,所以我認爲它可以幫助你。根本沒有商業思維。如果您需要的是關於TPL Dataflow的內部機制,請不要忽略我的答案。但是如果有人需要解決方案*答案有其價值。謝謝:) – Dodd

+0

更詳細地更新了答案。免責聲明還添加 – Dodd

相關問題