那些熟悉lmax ring buffer (disruptor)的人知道該數據結構的最大優勢之一在於它批量處理事件,並且當我們有一個消費者可以利用批處理使系統自動可調對負載來說,你投入的事件越多越好。可觀察到像Lmax Disruptor這樣的批處理
不知想不出我們實現與可觀察到同樣的效果(針對批處理功能)。我已經嘗試過Observable.buffer,但這是非常不同的,緩衝區將等待並且不發出批處理,而預期的事件數沒有到達。我們想要的是完全不同的。
給定子訂閱者正在等待Observable<Collection<Event>>
的批次,當單個項目到達流時,它發出一個由訂閱者處理的單個元素批處理,而它正在處理其他元素到達並收集到下一個批處理中一旦用戶完成執行,它就會獲得下一個批次以及自從它開始上次處理以來到達的所有事件...
因此,如果我們的訂戶足夠快以一次處理一個事件,它將這樣做,如果負載變得更高,它仍然會有相同的處理頻率,但每次都會有更多的事件(從而解決背壓問題)......不像緩衝區會等待批量填滿。
有什麼建議嗎?或者我應該使用環形緩衝區嗎?
我相信這個操作被稱爲'bufferIntrospective',看到http://stackoverflow.com/questions/28880247/buffer-while-processing-items –