2017-03-12 31 views
0

我的觀察者過度生產,我需要處理所有元素。當我第一次訂閱我想盡快收到元素,但我想緩衝其他人,直到 a)處理第一個元素 AND b)1秒超時。RxJava發出第一個元素並緩衝其他

我目前的實現是:

connectionService.subscribe(request) 
.buffer(1, TimeUnit.SECONDS) 
.flatMap(merger) 
.subscribe(...) 

的 「合併」 做的工作。有2個問題是:

1)的第一個元素將有1秒的延遲太多,但它應該可以立刻

2)如果合併時間超過1秒我收到下一行時元素以前不被處理(注意,合併的第一個元素時,只有一個問題)

+0

這是對的:你有一個可觀察的元素。每個元素必須一個接一個地處理。如果一個元素的「處理」花費的時間超過一秒,那麼下一個元素可以被處理並被處理?如果它不到一秒鐘,你想採取下一個並處理。這是對的嗎? –

+0

@HansWurst每個元素都依賴於前一個元素(它們是json補丁字符串http://jsonpatch.com/)。每個元素都必須按順序處理。第一個元素是初始json,因此需要分別計算,但下一個元素(json拼貼元素)可以合併。 – adam0404

+1

好吧,第一個emmited元素是init。 JSON。以下所有是必須應用於init的補丁。 JSON。看起來,您會使用reduce/scan將每個發出的補丁應用到初始json,最後您將獲得應用了所有補丁的json。但它可能是,製作人產生的速度比你應用這些補丁更快。爲此,您可以使用rxjava的背壓。這是對的嗎? –

回答

0

使用這樣的(假設RxJava 1):

connectionService.subscribe(request) 
.onBackpressureBuffer(...) 
.flatMap(merger, 1) 
.subscribe(...) 

使用onBackpressureBuffer操作的參數來指定發生了什麼當它溢出時。

.flatMap(..., 1)確保從上游請求正好1件物品。

相關問題