2016-09-20 28 views
0

我正在使用TPL Dataflow,並且需要實現我自己的操作塊。TPL Dataflow:按順序處理來自兩個傳入塊的消息

該操作塊應接受來自兩個不同輸入塊的消息,將這些消息放入單個隊列中,然後按順序處理該隊列。這裏的重點是兩個不同的任務不應該同時執行,我不想使用鎖。

這是我的解決方案,但它不能正常工作。

public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock 
    where TInputLhs : class 
    where TInputRhs : class 
{ 
    public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } } 
    public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } } 


    private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>(); 
    private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>(); 

    private ITargetBlock<object> queue; 

    public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs) 
    { 
     queue = new ActionBlock<object>(x => 
     { 
      if (x is TInputLhs) 
      { 
       actionLhs(x as TInputLhs); 
      } 
      else 
      { 
       actionRhs(x as TInputRhs); 
      } 
     }); 

     inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 
     inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 
    } 

    public void Complete() 
    { 
     queue.Complete(); 
    } 

    public Task Completion 
    { 
     get { return queue.Completion; } 
    } 

    public void Fault(Exception exception) 
    { 
     queue.Fault(exception); 
    } 
} 

簡單使用例:

static void Main(string[] args) 
{ 
    var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0)); 

    var batchBlock = new BatchBlock<string>(3); 

    var processInOrderBlock = new OrderedActionBlock<string, string[]>(
     new Action<string>((str) => 
     { 
      Console.WriteLine(str); 
     }), 
     new Action<string[]>((batch) => 
     { 
      Console.WriteLine("BATCH - " + string.Join(", ", batch)); 
     })); 

    splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true }); 
    splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true }); 
    batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true }); 

    for (int i = 1; i <= 10; i++) 
    { 
     splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray())); 
    } 

    splitBlock.Complete(); 

    processInOrderBlock.Completion.Wait(); 

    return; 
} 

輸出:

xx 
xxxx 
xxxxxx 
xxxxxxxx 
xxxxxxxxxx 
BATCH - x, xxx, xxxxx 
Press any key to continue . . . 

看起來卡在batchBlock消息。我不知道爲什麼。

+0

爲什麼你不能只使用一個普通ActionBlock一個的並行限制嗎?你*幾乎*有,只有極限缺失。 – usr

+0

@usr,在這種情況下,我應該將檢查消息類型(是單個消息還是批處理)的代碼抽出到用戶代碼中,而我希望將這些基礎架構代碼保存在庫中的某個位置。此外,我不喜歡接受「某些對象」的ActionBlock(即ActionBlock ),更喜歡靜態輸入 –

+0

你能更清楚地說出爲什麼你不能只設置maxparallelism = 1嗎?這與配料有什麼關係? – usr

回答

2

貌似queue完成時inputLhs任何inputRhs完成(如果在連接過程中使用PropagateCompletion = true選項)。

因此,我們需要改變這一點:

inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 

這樣:

Task.WhenAll(InputLhs.Completion, InputRhs.Completion) 
    .ContinueWith(_ => queue.Complete()); 
相關問題