2012-11-27 57 views
7

我有一個程序,我正在接收事件並希望分批處理它們,以便在處理當前批次時進入的所有項目都將出現在下一批次中。無效擴展:緩衝區直到用戶空閒

Rx中簡單的TimeSpan和基於計數的緩衝區方法會給我多批次的項目,而不是給我一大堆所有進來的項目(在訂戶需要的時間超過指定的TimeSpan或超過N項目進來,N大於計數)。

我看着使用更復雜的緩衝區過載,其中採取Func<IObservable<TBufferClosing>>IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>>,但我找不到如何使用這些的例子,更不知道如何將它們應用於我想要做的。

+0

[本頁](http://leecampbell.blogspot.com.au/2011/03/rx-part-9join-window-buffer-and-group.html)可能有助於緩衝區過載。整個系列非常有幫助 –

+0

您是否在TPL數據流中嘗試過'BufferBlock'? – Asti

回答

1

這是做你想做的嗎?

var xs = new Subject<int>(); 
var ys = new Subject<Unit>(); 

var zss = 
    xs.Buffer(ys); 

zss 
    .ObserveOn(Scheduler.Default) 
    .Subscribe(zs => 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(String.Join("-", zs)); 
     ys.OnNext(Unit.Default); 
    }); 

ys.OnNext(Unit.Default); 
xs.OnNext(1); 
Thread.Sleep(200); 
xs.OnNext(2); 
Thread.Sleep(600); 
xs.OnNext(3); 
Thread.Sleep(400); 
xs.OnNext(4); 
Thread.Sleep(300); 
xs.OnNext(5); 
Thread.Sleep(900); 
xs.OnNext(6); 
Thread.Sleep(100); 
xs.OnNext(7); 
Thread.Sleep(1000); 

我的結果:

1-2-3 
4-5 
6-7 
+0

刪除ObserveOn使程序在一個線程上執行導致它中斷。 –

+0

@ChrisEldredge - 是的。您可能必須允許多線程才能使其發揮作用。 – Enigmativity

+0

當隊列爲空時,它也會導致忙碌的等待(將CPU掛在100%處)。如果您在zs.Count == 0時刪除睡眠,您將看到尖峯。 –

1

你需要什麼東西來緩衝值,然後當工人 準備,它要求當前緩衝區,然後重置它。這可以 與RX和任務的組合

class TicTac<Stuff> { 

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>(); 

    List<Stuff> in = new List<Stuff>(); 

    public void push(Stuff stuff){ 
     lock(this){ 
      if(in == null){ 
       in = new List<Stuff>(); 
       Items.SetResult(in); 
      } 
      in.Add(stuff); 
     } 
    } 

    private void reset(){ 
     lock(this){ 
      Items = new TaskCompletionSource<List<Stuff>>(); 
      in = null; 
     } 
    } 

    public async Task<List<Stuff>> Items(){ 
     List<Stuff> list = await Items.Task; 
     reset(); 
     return list; 
    } 
} 

然後

var tictac = new TicTac<double>(); 

IObservable<double> source = .... 

source.Subscribe(x=>tictac.Push(x)); 

然後做你的工人

while(true){ 

    var items = await tictac.Items(); 

    Thread.Sleep(100); 

    for each (item in items){ 
     Console.WriteLine(item); 
    } 

} 
+1

我想這是可行的,但爲什麼要直接使用TPL/APM來使用Reactive Extensions? –

+0

ReactiveExtensions是一個框架,你可以隨意擴展它,我經常爲自定義的東西添加自己的操作符 – bradgonesurfing

+2

@ChrisEldredge公平地說,你所要求的不適合Rx框架;訂戶無法向觀察者發出信號,表示它閒置或不在,你必須這樣做。 – casperOne

1

之前,我已經做到了這一點的方法是拉起觀察DotPeek/Reflector中的方法,並採用它具有的排隊概念並使其適應我們的要求。例如,在具有快速滴答數據的UI應用程序(如財務)中,UI線程可能會充滿事件,有時它不能足夠快速地更新。在這些情況下,我們希望刪除除最後一個以外的所有事件(針對特定儀器)。在這種情況下,我們將ObserveOn的內部隊列更改爲T的單個值(查找ObserveLatestOn(IScheduler))。在你的情況下,你想要隊列,但是你想推動整個隊列不只是第一個值。這應該讓你開始。