2

那些熟悉lmax ring buffer (disruptor)的人知道該數據結構的最大優勢之一在於它批量處理事件,並且當我們有一個消費者可以利用批處理使系統自動可調對負載來說,你投入的事件越多越好。可觀察到像Lmax Disruptor這樣的批處理

不知想不出我們實現與可觀察到同樣的效果(針對批處理功能)。我已經嘗試過Observable.buffer,但這是非常不同的,緩衝區將等待並且不發出批處理,而預期的事件數沒有到達。我們想要的是完全不同的。

給定子訂閱者正在等待Observable<Collection<Event>>的批次,當單個項目到達流時,它發出一個由訂閱者處理的單個元素批處理,而它正在處理其他元素到達並收集到下一個批處理中一旦用戶完成執行,它就會獲得下一個批次以及自從它開始上次處理以來到達的所有事件...

因此,如果我們的訂戶足夠快以一次處理一個事件,它將這樣做,如果負載變得更高,它仍然會有相同的處理頻率,但每次都會有更多的事件(從而解決背壓問題)......不像緩衝區會等待批量填滿。

有什麼建議嗎?或者我應該使用環形緩衝區嗎?

+1

我相信這個操作被稱爲'bufferIntrospective',看到http://stackoverflow.com/questions/28880247/buffer-while-processing-items –

回答

5

RxJava和干擾器代表了兩種不同的編程方法。

我沒有經歷過與干擾物,但基於視頻講座,它基本上是一個大的緩衝區,其中生產者發出的數據像流水和消費者旋/產/塊,直到數據可用。

RxJava,另一方面,目的在於非阻塞事件傳遞。我們也有環形緩衝器,特別是在觀察中,它作爲生產者和消費者之間的異步邊界,但這些要小得多,我們通過應用協同程序方法避免緩衝區溢出和緩衝膨脹。共同例程歸結爲發送到您的回調的回調,以便您可以回調我們的回調以按照您的速度向您發送一些數據。這些請求的頻率決定了調整。

有不支持這樣的協流和所需要的onBackpressureXXX運營商,將緩衝/降值如果下游沒有請求足夠快的一個數據源。

如果您認爲您可以比逐一更有效地批量處理數據,則可以使用buffer運算符,該運算符具有過載以指定緩衝區的持續時間:例如,您可以擁有10 ms的值數據,與這段時間內到達有多少值無關。

控制經由請求頻率的分批大小是棘手的,並且可以具有不可預見的後果。一般來說,問題是如果您從批處理源中獲得request(n),則表明您可以處理n個元素,但源代碼現在必須創建n個大小爲1的緩衝區(因爲類型爲Observable<List<T>>)。相反,如果沒有請求被調用,操作員緩衝數據導致更長的緩衝區。這些行爲在處理過程中會帶來額外的開銷,如果你真的能夠保持並且還必須將冷源變成冷熱源(因爲否則你所擁有的基本上就是buffer(1)),這本身現在可以導致緩衝膨脹。