0
我正在使用TPL Dataflow,並且需要實現我自己的操作塊。TPL Dataflow:按順序處理來自兩個傳入塊的消息
該操作塊應接受來自兩個不同輸入塊的消息,將這些消息放入單個隊列中,然後按順序處理該隊列。這裏的重點是兩個不同的任務不應該同時執行,我不想使用鎖。
這是我的解決方案,但它不能正常工作。
public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
where TInputLhs : class
where TInputRhs : class
{
public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }
private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();
private ITargetBlock<object> queue;
public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
{
queue = new ActionBlock<object>(x =>
{
if (x is TInputLhs)
{
actionLhs(x as TInputLhs);
}
else
{
actionRhs(x as TInputRhs);
}
});
inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
}
public void Complete()
{
queue.Complete();
}
public Task Completion
{
get { return queue.Completion; }
}
public void Fault(Exception exception)
{
queue.Fault(exception);
}
}
簡單使用例:
static void Main(string[] args)
{
var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));
var batchBlock = new BatchBlock<string>(3);
var processInOrderBlock = new OrderedActionBlock<string, string[]>(
new Action<string>((str) =>
{
Console.WriteLine(str);
}),
new Action<string[]>((batch) =>
{
Console.WriteLine("BATCH - " + string.Join(", ", batch));
}));
splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });
for (int i = 1; i <= 10; i++)
{
splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
}
splitBlock.Complete();
processInOrderBlock.Completion.Wait();
return;
}
輸出:
xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .
看起來卡在batchBlock
消息。我不知道爲什麼。
爲什麼你不能只使用一個普通ActionBlock一個的並行限制嗎?你*幾乎*有,只有極限缺失。 – usr
@usr,在這種情況下,我應該將檢查消息類型(是單個消息還是批處理)的代碼抽出到用戶代碼中,而我希望將這些基礎架構代碼保存在庫中的某個位置。此外,我不喜歡接受「某些對象」的ActionBlock(即ActionBlock
你能更清楚地說出爲什麼你不能只設置maxparallelism = 1嗎?這與配料有什麼關係? – usr