我有一個應用程序,它在某些時候幾乎同時引發1000個事件。我想要做的是將事件批量分成50個項目,並開始每10秒處理一次。在開始新的批處理之前,不需要等待批量完成。反應式擴展:批量處理事件+在每批之間添加延遲
例如:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
任何想法如何實現這一目標?我認爲Reactive Extensions是可行的,但其他解決方案也是可以接受的。
我試圖從這裏開始:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
但注意到,因爲我希望,而是所有批次同時啓動,雖然5秒延遲延遲沒有工作。
我也測試了Window方法,但我沒有注意到行爲上的任何差異。我想在窗口時間跨度實際上意味着「採取一切恰好在接下來的10秒事件:
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
我使用的RX-主要2.0.20304-β
我希望我能給予好評不止一次這樣多!只花了三個小時試圖弄清楚。 –