2012-06-07 56 views
10

我有一個應用程序,它在某些時候幾乎同時引發1000個事件。我想要做的是將事件批量分成50個項目,並開始每10秒處理一次。在開始新的批處理之前,不需要等待批量完成。反應式擴展:批量處理事件+在每批之間添加延遲

例如:

10:00:00: 10000 new events received 
10:00:00: StartProcessing (events.Take(50)) 
10:00:10: StartProcessing (events.Skip(50).Take(50)) 
10:00:15: StartProcessing (events.Skip(100).Take(50)) 

任何想法如何實現這一目標?我認爲Reactive Extensions是可行的,但其他解決方案也是可以接受的。

我試圖從這裏開始:

 var bufferedItems = eventAsObservable 
      .Buffer(15) 
      .Delay(TimeSpan.FromSeconds(5) 

但注意到,因爲我希望,而是所有批次同時啓動,雖然5秒延遲延遲沒有工作。

我也測試了Window方法,但我沒有注意到行爲上的任何差異。我想在窗口時間跨度實際上意味着「採取一切恰好在接下來的10秒事件:

 var bufferedItems = eventAsObservable 
      .Window(TimeSpan.FromSeconds(10), 5) 
      .SelectMany(x => x) 
      .Subscribe(DoProcessing); 

我使用的RX-主要2.0.20304-β

回答

20

如果你不想睡覺線程,你可以這樣做:

var tick = Observable.Interval(TimeSpan.FromSeconds(5)); 

eventAsObservable 
.Buffer(50) 
.Zip(tick, (res, _) => res) 
.Subscribe(DoProcessing); 
+1

我希望我能給予好評不止一次這樣多!只花了三個小時試圖弄清楚。 –

1

試試這個:

var bufferedItems = eventAsObservable 
     .Buffer(50) 
     .Do(_ => { Thread.Sleep(10000); }); 
相關問題