我有一個非常簡單的問題。我需要一種方法來輕鬆地對需要一些時間的消息執行一些處理。在處理過程中,可能會輸入新的請求,但除最後一個之外的所有請求都可以被丟棄。TPL Dataflow Broadcastblock丟棄最後一條消息
所以我認爲TPL Broadcastblock
應該這樣做,例如查看文檔和帖子,以及StackExchange。我創建了以下解決方案併爲其添加了一些單元測試,但在單元測試中,有時最後一項不會發送。
這不是我所期望的。如果它應該放棄任何東西,我會說它應該放棄第一項,因爲如果它不能處理消息它應該覆蓋它的緩衝區1。任何人都可以看到它是什麼?
任何幫助將不勝感激!
下面是塊代碼:
/// <summary>
/// This block will take items and perform the specified action on it. Any incoming messages while the action is being performed
/// will be discarded.
/// </summary>
public class DiscardWhileBusyActionBlock<T> : ITargetBlock<T>
{
private readonly BroadcastBlock<T> broadcastBlock;
private readonly ActionBlock<T> actionBlock;
/// <summary>
/// Initializes a new instance of the <see cref="DiscardWhileBusyActionBlock{T}"/> class.
/// Constructs a SyncFilterTarget{TInput}.
/// </summary>
/// <param name="actionToPerform">Thing to do.</param>
public DiscardWhileBusyActionBlock(Action<T> actionToPerform)
{
if (actionToPerform == null)
{
throw new ArgumentNullException(nameof(actionToPerform));
}
this.broadcastBlock = new BroadcastBlock<T>(item => item);
this.actionBlock = new ActionBlock<T>(actionToPerform, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
this.broadcastBlock.LinkTo(this.actionBlock);
this.broadcastBlock.Completion.ContinueWith(task => this.actionBlock.Complete());
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return ((ITargetBlock<T>)this.broadcastBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public void Complete()
{
this.broadcastBlock.Complete();
}
public void Fault(Exception exception)
{
((ITargetBlock<T>)this.broadcastBlock).Fault(exception);
}
public Task Completion => this.actionBlock.Completion;
}
下面是用於測試的代碼:
[TestClass]
public class DiscardWhileBusyActionBlockTest
{
[TestMethod]
public void PostToConnectedBuffer_ActionNotBusy_MessageConsumed()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
buffer.Post(1);
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
[TestMethod]
public void PostToConnectedBuffer_ActionBusy_MessagesConsumedWhenActionBecomesAvailable()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
actionPerformer.SetBusy();
// 1st message will set the actionperformer to busy, 2nd message should be sent when
// it becomes available.
buffer.Post(1);
buffer.Post(2);
actionPerformer.SetAvailable();
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1, 2 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
[TestMethod]
public void PostToConnectedBuffer_ActionBusy_DiscardMessagesInBetweenAndProcessOnlyLastMessage()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
actionPerformer.SetBusy();
buffer.Post(1);
buffer.Post(2);
buffer.Post(3);
buffer.Post(4);
buffer.Post(5);
actionPerformer.SetAvailable();
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1, 5 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
private static void WaitForCompletion(IDataflowBlock source, IDataflowBlock target)
{
source.Complete();
target.Completion.Wait(TimeSpan.FromSeconds(1));
}
private static BufferBlock<int> SetupBuffer(ITargetBlock<int> block)
{
var buffer = new BufferBlock<int>();
buffer.LinkTo(block);
buffer.Completion.ContinueWith(task => block.Complete());
return buffer;
}
private class ActionPerformer
{
private readonly ManualResetEvent resetEvent = new ManualResetEvent(true);
public List<int> LastReceivedMessage { get; } = new List<int>();
public void Perform(int message)
{
this.resetEvent.WaitOne(TimeSpan.FromSeconds(3));
this.LastReceivedMessage.Add(message);
}
public void SetBusy()
{
this.resetEvent.Reset();
}
public void SetAvailable()
{
this.resetEvent.Set();
}
}
}
感謝您的回覆(以及附註;) 因此,如果我正確理解您,廣播區塊在覆蓋它的緩衝區時正在做它的工作,但Actionblock實際上是在放棄它們?有趣! 雖然在檢查Post的返回值時我不理解您的其他評論。我不會將消息發佈到動作塊,它是通過OfferMessage方法完成的。Bufferblock不會工作,因爲它會緩存所有傳入的消息,而我希望只有一個緩衝區每次在接收器繁忙時收到消息時被覆蓋。你能否澄清這件事? –
已更新的答案。 – VMAtm
好的,謝謝你的解釋。所以我得出結論,我不能使用內置的TPL邏輯來實現它,並且必須找到我自己的解決方案。再次感謝! –