2017-03-23 38 views
2

我已經建立了使用TPL Dataflow兩條管道:鏈接多個TransformBlocks單挑BatchBlock TPL

TransformBlock => TransformBlock => BatchBlock => .... 

TransformBlock => BatchBlock => TransformBlock => .... 

我要完成

  /=> Transform Block => TransformBlock => BatchBlock => .... 
BatchBlock/
      \ 
      \ => Transform Block => BatchBlock => TransformBlock => .... 

但只有第一個管道被執行。

我的代碼

batchMediaBlock.LinkTo(pipelineA.FirstBlock, new DataflowLinkOptions {PropagateCompletion = true}); 
batchMediaBlock.LinkTo(pipelineB.FirstBlock, new DataflowLinkOptions {PropagateCompletion = true}); 

我怎樣才能做到這一點?

回答

1

您需要在之後BatchBlock。但是請注意,完成只會傳播到您的TransformBlock之一。請參閱下面的部分例子來辦理竣工:

using System.Threading.Tasks.Dataflow; 

namespace MyDataflow { 
    class MyDataflow { 

     public void HandlingCompletion() { 
      var batchBlock = new BatchBlock<int>(10); 
      var broadcastBlock = new BroadcastBlock<int[]>(_ => _); 
      var xForm1 = new TransformBlock<int[], int[]>(_ => _); 
      var xForm2 = new TransformBlock<int[], int[]>(_ => _); 

      batchBlock.LinkTo(broadcastBlock, new DataflowLinkOptions() { PropagateCompletion = true }); 
      broadcastBlock.LinkTo(xForm1); 
      broadcastBlock.LinkTo(xForm1); 

      broadcastBlock.Completion.ContinueWith(broadcastBlockCompletionTask => { 
       if (!broadcastBlockCompletionTask.IsFaulted) { 
        xForm1.Complete(); 
        xForm2.Complete(); 
       }else { 
        ((IDataflowBlock)xForm1).Fault(broadcastBlockCompletionTask.Exception); 
        ((IDataflowBlock)xForm2).Fault(broadcastBlockCompletionTask.Exception); 
       } 

      }); 

      xForm1.Completion.ContinueWith(async _ => { 
       try { 
        await xForm2.Completion; 
        //continue passing completion/fault on to rest of pipeline 
       } catch { 

       } 
      }); 

     } 
    } 
} 

另外,如果您的管道從來沒有收斂再次,你可以單獨處理完成的繼續BroacastBlock後每個流水線。所提供的示例將同時完成管道中的每個步驟,同步完成。

1

默認情況下,TPL數據流的連接被認爲是貪婪的,所以第一目標總是得到消息先前塊」輸出刪除它,這就是爲什麼你的第二塊沒有得到任何消息。這種情況下可以通過BroadcastBlock<T>加以解決,這

確保當前元素在允許元件被覆蓋之前廣播任何鏈接目標。

您還應該注意,此塊做的克隆的消息。

所以你基本上應該在你的批處理塊後添加一個廣播,但是!你不應該把你的完成從廣播塊傳播給消費者 - 只有第一個纔會完成。正如@JSteward所建議的,您應該爲廣播添加ContinueWith處理程序。