我同意其他人TPL數據流聽起來這是很好的解決方案。
要限制處理,你可以創建一個TransformBlock
實際上並沒有以任何方式轉換數據,它只是延緩它,如果它以前的數據後立即趕到:
static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
DateTime lastItem = DateTime.MinValue;
return new TransformBlock<T, T>(
async x =>
{
var waitTime = lastItem + delay - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
await Task.Delay(waitTime);
lastItem = DateTime.UtcNow;
return x;
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
然後創建一個方法產生的數據(從0開始的整數例子):
static async Task Producer(ITargetBlock<int> target)
{
int i = 0;
while (await target.SendAsync(i))
i++;
}
它的異步寫入,因此,如果目標塊是不能夠馬上處理的項目,它會等待。
然後寫一個消費者方法:
static void Consumer(int i)
{
Console.WriteLine(i);
}
最後,連在一起這一切,並啓動它:
var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));
var consumerBlock = new ActionBlock<int>(
(Action<int>)Consumer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);
這裏,delayBlock
將接受最多一個每500毫秒項和方法可以並行運行多次。要完成處理,請撥打delayBlock.Complete()
。
如果你想爲你的#2添加一些緩存,你可以創建另一個TransformBlock
做那裏的工作,並將其鏈接到其他塊。
我認爲至少#2是不可能做到與TaskScheduler,因爲它處理「任務」,並且沒有辦法從中獲取這些信息。 – svick 2012-03-20 22:25:43
您是否對使用C#5/.Net 4.5的解決方案感興趣? – svick 2012-03-20 22:26:10
@svick絕對,我有幸被允許使用c#5 – Fen 2012-03-20 22:32:36