2017-07-31 42 views
1

我有一個非常簡單的問題。我需要一種方法來輕鬆地對需要一些時間的消息執行一些處理。在處理過程中,可能會輸入新的請求,但除最後一個之外的所有請求都可以被丟棄。TPL Dataflow Broadcastblock丟棄最後一條消息

所以我認爲TPL Broadcastblock應該這樣做,例如查看文檔和帖子,以及StackExchange。我創建了以下解決方案併爲其添加了一些單元測試,但在單元測試中,有時最後一項不會發送。

這不是我所期望的。如果它應該放棄任何東西,我會說它應該放棄第一項,因爲如果它不能處理消息它應該覆蓋它的緩衝區1。任何人都可以看到它是什麼?
任何幫助將不勝感激!

下面是塊代碼:

/// <summary> 
/// This block will take items and perform the specified action on it. Any incoming messages while the action is being performed 
/// will be discarded. 
/// </summary> 
public class DiscardWhileBusyActionBlock<T> : ITargetBlock<T> 
{ 
    private readonly BroadcastBlock<T> broadcastBlock; 

    private readonly ActionBlock<T> actionBlock; 

    /// <summary> 
    /// Initializes a new instance of the <see cref="DiscardWhileBusyActionBlock{T}"/> class. 
    /// Constructs a SyncFilterTarget{TInput}. 
    /// </summary> 
    /// <param name="actionToPerform">Thing to do.</param> 
    public DiscardWhileBusyActionBlock(Action<T> actionToPerform) 
    { 
     if (actionToPerform == null) 
     { 
      throw new ArgumentNullException(nameof(actionToPerform)); 
     } 

     this.broadcastBlock = new BroadcastBlock<T>(item => item); 
     this.actionBlock = new ActionBlock<T>(actionToPerform, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 }); 
     this.broadcastBlock.LinkTo(this.actionBlock); 
     this.broadcastBlock.Completion.ContinueWith(task => this.actionBlock.Complete()); 
    } 

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept) 
    { 
     return ((ITargetBlock<T>)this.broadcastBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept); 
    } 

    public void Complete() 
    { 
     this.broadcastBlock.Complete(); 
    } 

    public void Fault(Exception exception) 
    { 
     ((ITargetBlock<T>)this.broadcastBlock).Fault(exception); 
    } 

    public Task Completion => this.actionBlock.Completion; 
} 

下面是用於測試的代碼:

[TestClass] 
public class DiscardWhileBusyActionBlockTest 
{ 
    [TestMethod] 
    public void PostToConnectedBuffer_ActionNotBusy_MessageConsumed() 
    { 
     var actionPerformer = new ActionPerformer(); 

     var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform); 
     var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block); 

     buffer.Post(1); 

     DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block); 

     var expectedMessages = new[] { 1 }; 
     actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages); 
    } 

    [TestMethod] 
    public void PostToConnectedBuffer_ActionBusy_MessagesConsumedWhenActionBecomesAvailable() 
    { 
     var actionPerformer = new ActionPerformer(); 

     var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform); 
     var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block); 

     actionPerformer.SetBusy(); 

     // 1st message will set the actionperformer to busy, 2nd message should be sent when 
     // it becomes available. 
     buffer.Post(1); 
     buffer.Post(2); 

     actionPerformer.SetAvailable(); 

     DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block); 

     var expectedMessages = new[] { 1, 2 }; 
     actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages); 
    } 

    [TestMethod] 
    public void PostToConnectedBuffer_ActionBusy_DiscardMessagesInBetweenAndProcessOnlyLastMessage() 
    { 
     var actionPerformer = new ActionPerformer(); 

     var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform); 
     var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block); 

     actionPerformer.SetBusy(); 

     buffer.Post(1); 
     buffer.Post(2); 
     buffer.Post(3); 
     buffer.Post(4); 
     buffer.Post(5); 

     actionPerformer.SetAvailable(); 

     DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block); 

     var expectedMessages = new[] { 1, 5 }; 
     actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages); 
    } 

    private static void WaitForCompletion(IDataflowBlock source, IDataflowBlock target) 
    { 
     source.Complete(); 
     target.Completion.Wait(TimeSpan.FromSeconds(1)); 
    } 

    private static BufferBlock<int> SetupBuffer(ITargetBlock<int> block) 
    { 
     var buffer = new BufferBlock<int>(); 
     buffer.LinkTo(block); 
     buffer.Completion.ContinueWith(task => block.Complete()); 
     return buffer; 
    } 

    private class ActionPerformer 
    { 
     private readonly ManualResetEvent resetEvent = new ManualResetEvent(true); 

     public List<int> LastReceivedMessage { get; } = new List<int>(); 

     public void Perform(int message) 
     { 
      this.resetEvent.WaitOne(TimeSpan.FromSeconds(3)); 
      this.LastReceivedMessage.Add(message); 
     } 

     public void SetBusy() 
     { 
      this.resetEvent.Reset(); 
     } 

     public void SetAvailable() 
     { 
      this.resetEvent.Set(); 
     } 
    } 
} 

回答

0

當你級別的動作塊的BoundedCapacity1,這意味着,如果它處理完畢,並且已經有它的隊列中的項目,它將丟棄消息,這將超出範圍。所以基本上發生了什麼是你的塊做它的工作,在緩衝區已滿時拒絕新消息。之後,廣播塊完成,因爲整個消息被髮送給收件人,並且它調用Completion,完成整個管道。

您需要檢查最後一條消息的返回布爾值Post,或者更可能將最後一條消息存儲在某個變量中,以確保它將轉至管道。看起來你最好不要使用BroadcastBlock,因爲它的目的是將消息的副本提供給鏈接塊的數量,並且只需自己編寫你的邏輯。也許你可以用一個簡單的BufferBlock來代替。

更新OfferMessage方法也提供有關正在提供的消息的信息。我認爲你根本不需要緩衝塊,因爲你必須處理管道的非默認邏輯。更容易擁有像_lastMessage這樣的字段,存儲最後一個值,並在請求被actionBlock接受時將其擦除。您甚至可以完全刪除數據流依賴項,因爲您只需調用該請求的方法即可。

旁註:你可以link blocks with completion propagation集選項:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; 
this.broadcastBlock.LinkTo(this.actionBlock, linkOptions); 

這可以去除一些你的代碼與potentially dangerous ContinueWith使用。如果您需要異步行爲,您也可以await broadcastBlock.SendAsync()而不是Post

+0

感謝您的回覆(以及附註;) 因此,如果我正確理解您,廣播區塊在覆蓋它的緩衝區時正在做它的工作,但Actionblock實際上是在放棄它們?有趣! 雖然在檢查Post的返回值時我不理解您的其他評論。我不會將消息發佈到動作塊,它是通過OfferMessage方法完成的。Bufferblock不會工作,因爲它會緩存所有傳入的消息,而我希望只有一個緩衝區每次在接收器繁忙時收到消息時被覆蓋。你能否澄清這件事? –

+0

已更新的答案。 – VMAtm

+0

好的,謝謝你的解釋。所以我得出結論,我不能使用內置的TPL邏輯來實現它,並且必須找到我自己的解決方案。再次感謝! –