2016-09-09 51 views
1

我試圖通過在適當時執行批量請求來優化與RxJava的Web服務調用,但不會在響應中引入太多延遲。 對於我使用buffer(closingSelector)運營商debounce()像這樣關選擇:Rx緩衝限制+超時的快速生成流

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share(); 
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit); 
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */); 

它工作正常,但如果requestStream產生太快會發出巨大的批次,過大,WS處理一次,所以我想以某種方式限制批量大小。 所以我需要一個closingSelector,發出一個關閉事件,如果緩衝區中有X個項目,或者自上一個項目從上游到達後的Y個時間量。

除了實現類似於OperatorDebounceWithTime的自定義Operator外,我似乎無法找到一個好的解決方案,但有一個內部緩衝區,它返回緩衝區中的所有元素,而不是最後一個元素。

有沒有一種更簡單的方法來實現這個例如通過結合一些操作?

編輯:

張貼的問題後,我意識到,上面的代碼塊有另外一個問題:如果請求連續流動速度比反跳超時(requestStream是生產比windowSize更快),那麼burstyDeBounced不會散發出任何東西因此所有的請求都會被緩存,直到傳入流中有足夠長的暫停。

+0

您是否找到了解決這個問題的最佳解決方案?如果是的話,你可以發佈它?乾杯 – cerisier

回答

0

你可以去抖源的大容量緩存分割成更小的:

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share(); 
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast 
    .debounce(windowSize, windowUnit); 

burstyMulticast.buffer(burstyDeBounced) 
.onBackpressureBuffer() 
.concatMapIterable(list -> Lists.partition(list, windowSizeLimit)) 
.subscribe(...); 

其中Lists.partition是谷歌番石榴。

+0

感謝您的意見,不幸的是我意識到上述原始代碼片段存在另一個問題,所以我無法使用debounce,請在原始帖子中查看我的編輯。 –