其實,還有與bufferTime()
運營商和它的三個參數來做到這一點更簡單的方法:
bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize)
這意味着我們可以使用bufferTime(1000, null, 10)
這意味着我們將發射最大1秒後最多10個項目的緩衝區或。 null
表示我們想要在當前緩衝區發出後立即打開一個新的緩衝區。
function mockRequest(val) {
return Observable
.of(val)
.delay(100)
.map(val => 'R' + val);
}
Observable
.range(0, 55)
.concatMap(val => Observable.of(val)
.delay(25) // async source of values
// .delay(175)
)
.bufferTime(1000, null, 10) // collect all items for 1s
.concatMap(buffer => Observable
.from(buffer) // make requests
.delay(1000) // delay this batch by 1s (rate-limit)
.mergeMap(value => mockRequest(value)) // collect results regardless their initial order
.toArray()
)
// .timestamp()
.subscribe(val => console.log(val));
見現場演示:https://jsbin.com/mijepam/19/edit?js,console
你可以用不同的初始延遲實驗。只有25ms
請求將分批由10發送:
[ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ]
[ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ]
[ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ]
[ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ]
[ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]
但隨着.delay(175)
因爲我們在1秒延遲的限制,我們將發出不到10個項目批次。
[ 'R0', 'R1', 'R2', 'R3', 'R4' ]
[ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ]
[ 'R11', 'R12', 'R13', 'R14', 'R15' ]
[ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ]
[ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ]
[ 'R28', 'R29', 'R30', 'R31', 'R32' ]
[ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ]
[ 'R39', 'R40', 'R41', 'R42', 'R43' ]
[ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]
然而,您可能需要什麼不同。由於.bufferTime(1000, ...)
和delay(1000)
,此解決方案最初會在2秒延遲後開始發射值。所有其他排放發生在1秒後。
你可能最終使用:
.bufferTime(1000, null, 10)
.mergeAll()
.bufferCount(10)
這將始終收集10個項目,只有經過它會執行請求。這可能會更有效。
僅供參考,我已經更新了這個答案。 「直到」計算並不是最佳的;它應該基於第一條記錄 - 而不是現在。 – cartant