我在我的應用程序中實現了使用TPL Dataflow的生產者/消費者模式。我有大約40個塊的大數據流網格。網格中有兩個主要的功能部分:生產者部分和消費者部分。有時,消費者在處理來訪工作時應該連續爲生產者提供大量的工作。當消費者忙於某些指定數量的工作項目時,我想暫停生產者。否則,應用會消耗大量的內存/ CPU,並且行爲不可持續。當消費者不知所措時,如何讓快速製片人暫停?
我做了演示應用程序演示了這個問題:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
{
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
}, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
}
}
}
該應用程序打印是這樣的:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
這意味着生產商都在旋轉,推動信息消費。我希望它暫停並等待消費者完成當前部分工作,然後生產者應繼續爲消費者提供消息。
我的問題是類似於其他question的報價,但沒有正確回答。我嘗試過這個解決方案,它在這裏不起作用,允許生產者用消息來淹沒消費者。同樣設置BoundedCapacity
也不起作用。
我猜到目前爲止唯一的解決方案是製作我自己的塊,它將監視目標塊隊列並根據目標塊的隊列行爲。但我希望對這個問題有點矯枉過正。
你有沒有考慮過使用'Rx'?看看這個答案:http://stackoverflow.com/questions/2542764/tpl-vs-reactive-framework –
我希望不會有這樣的需要,因爲大量的時間花在數據流上,它適合我的需求很好。 – kseen
在您的演示中,製作人可以自己製作所有的信息,而無需通過廣播塊接收來自其自身的信息。你的真實代碼是否也是這樣,還是生產者→生產者循環是必要的? – svick