你需要什麼東西來緩衝值,然後當工人 準備,它要求當前緩衝區,然後重置它。這可以 與RX和任務的組合
class TicTac<Stuff> {
private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();
List<Stuff> in = new List<Stuff>();
public void push(Stuff stuff){
lock(this){
if(in == null){
in = new List<Stuff>();
Items.SetResult(in);
}
in.Add(stuff);
}
}
private void reset(){
lock(this){
Items = new TaskCompletionSource<List<Stuff>>();
in = null;
}
}
public async Task<List<Stuff>> Items(){
List<Stuff> list = await Items.Task;
reset();
return list;
}
}
然後
var tictac = new TicTac<double>();
IObservable<double> source = ....
source.Subscribe(x=>tictac.Push(x));
然後做你的工人
while(true){
var items = await tictac.Items();
Thread.Sleep(100);
for each (item in items){
Console.WriteLine(item);
}
}
[本頁](http://leecampbell.blogspot.com.au/2011/03/rx-part-9join-window-buffer-and-group.html)可能有助於緩衝區過載。整個系列非常有幫助 –
您是否在TPL數據流中嘗試過'BufferBlock'? – Asti