我試圖通過在適當時執行批量請求來優化與RxJava的Web服務調用,但不會在響應中引入太多延遲。 對於我使用buffer(closingSelector)
運營商debounce()
像這樣關選擇:Rx緩衝限制+超時的快速生成流
Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */);
它工作正常,但如果requestStream
產生太快會發出巨大的批次,過大,WS處理一次,所以我想以某種方式限制批量大小。 所以我需要一個closingSelector
,發出一個關閉事件,如果緩衝區中有X個項目,或者自上一個項目從上游到達後的Y個時間量。
除了實現類似於OperatorDebounceWithTime
的自定義Operator
外,我似乎無法找到一個好的解決方案,但有一個內部緩衝區,它返回緩衝區中的所有元素,而不是最後一個元素。
有沒有一種更簡單的方法來實現這個例如通過結合一些操作?
編輯:
張貼的問題後,我意識到,上面的代碼塊有另外一個問題:如果請求連續流動速度比反跳超時(requestStream
是生產比windowSize
更快),那麼burstyDeBounced
不會散發出任何東西因此所有的請求都會被緩存,直到傳入流中有足夠長的暫停。
您是否找到了解決這個問題的最佳解決方案?如果是的話,你可以發佈它?乾杯 – cerisier