2017-02-16 77 views
3

我想使用向服務器發出一系列請求,但服務器的硬限速爲每秒10次請求。如果我試圖在循環中創建請求,它將達到速率限制,因爲所有請求都會同時發生。我如何使用RxJS無損評估限制請求5

for(let i = 0; i < 20; i++) { 
    sendRequest(); 
} 

ReactiveX有很多工具來修改可觀察的流,但我似乎無法找到實施速率限制的工具。我嘗試添加一個標準延遲,但這些請求仍然在同一時間觸發,比他們以前的時間晚了100ms。

const queueRequest$ = new Rx.Subject<number>(); 

queueRequest$ 
    .delay(100) 
    .subscribe(queueData => { 
    console.log(queueData); 
    }); 

const queueRequest = (id) => queueRequest$.next(id); 

function fire20Requests() { 
    for (let i=0; i<20; i++) { 
    queueRequest(i); 
    } 
} 

fire20Requests(); 
setTimeout(fire20Requests, 1000); 
setTimeout(fire20Requests, 5000); 

debounceTimethrottleTime運營商都相似,我正在尋找爲好,但是這是不是無損有損。我想保留所做的每一個請求,而不是丟棄早先的請求。

... 
queueRequest$ 
    .debounceTime(100) 
    .subscribe(queueData => { 
    sendRequest(); 
    }); 
... 

我如何不超過使用ReactiveX和觀測量速率限制這些請求到服務器?

回答

2

OP的self answer(和linked blog)中的實現始終強加一個不理想的延遲。

如果速率限制服務允許每秒10個請求,則應該可以在10毫秒內發出10個請求,只要下一個請求不再進行另一個990毫秒即可。

下面的實施應用可變延遲來確保限制被強制執行,並且延遲僅適用於會看到超出限制的請求。

function rateLimit(source, count, period) { 
 

 
    return source 
 
    .scan((records, value) => { 
 

 
     const now = Date.now(); 
 
     const since = now - period; 
 

 
     // Keep a record of all values received within the last period. 
 

 
     records = records.filter((record) => record.until > since); 
 
     if (records.length >= count) { 
 

 
     // until is the time until which the value should be delayed. 
 

 
     const firstRecord = records[0]; 
 
     const lastRecord = records[records.length - 1]; 
 
     const until = firstRecord.until + (period * Math.floor(records.length/count)); 
 

 
     // concatMap is used below to guarantee the values are emitted 
 
     // in the same order in which they are received, so the delays 
 
     // are cumulative. That means the actual delay is the difference 
 
     // between the until times. 
 

 
     records.push({ 
 
      delay: (lastRecord.until < now) ? 
 
      (until - now) : 
 
      (until - lastRecord.until), 
 
      until, 
 
      value 
 
     }); 
 
     } else { 
 
     records.push({ 
 
      delay: 0, 
 
      until: now, 
 
      value 
 
     }); 
 
     } 
 
     return records; 
 

 
    }, []) 
 
    .concatMap((records) => { 
 

 
     const lastRecord = records[records.length - 1]; 
 
     const observable = Rx.Observable.of(lastRecord.value); 
 
     return lastRecord.delay ? observable.delay(lastRecord.delay) : observable; 
 
    }); 
 
} 
 

 
const start = Date.now(); 
 
rateLimit(
 
    Rx.Observable.range(1, 30), 
 
    10, 
 
    1000 
 
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

+0

僅供參考,我已經更新了這個答案。 「直到」計算並不是最佳的;它應該基於第一條記錄 - 而不是現在。 – cartant

1

This blog post做解釋RxJS在丟棄事件是偉大的一個偉大的工作,以及他們如何來到了答案,但最終,你要尋找的代碼是:

queueRequest$ 
    .concatMap(queueData => Rx.Observable.of(queueData).delay(100)) 
    .subscribe(() => { 
    sendRequest(); 
    }); 

concatMap增加的串接新創建的可觀察流後面的可觀察點。此外,使用delay將事件推回100ms,允許每秒發生10次請求。 You can view the full JSBin here, which logs to the console instead of firing requests.

1

其實,還有與bufferTime()運營商和它的三個參數來做到這一點更簡單的方法:

bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize) 

這意味着我們可以使用bufferTime(1000, null, 10)這意味着我們將發射最大1秒後最多10個項目的緩衝區null表示我們想要在當前緩衝區發出後立即打開一個新的緩衝區。

function mockRequest(val) { 
    return Observable 
    .of(val) 
    .delay(100) 
    .map(val => 'R' + val); 
} 

Observable 
    .range(0, 55) 
    .concatMap(val => Observable.of(val) 
    .delay(25) // async source of values 
    // .delay(175) 
) 

    .bufferTime(1000, null, 10) // collect all items for 1s 

    .concatMap(buffer => Observable 
    .from(buffer) // make requests 
    .delay(1000) // delay this batch by 1s (rate-limit) 
    .mergeMap(value => mockRequest(value)) // collect results regardless their initial order 
    .toArray() 
) 
    // .timestamp() 
    .subscribe(val => console.log(val)); 

見現場演示:https://jsbin.com/mijepam/19/edit?js,console

你可以用不同的初始延遲實驗。只有25ms請求將分批由10發送:

[ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ] 
[ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ] 
[ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ] 
[ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ] 
[ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ] 
[ 'R50', 'R51', 'R52', 'R53', 'R54' ] 

但隨着.delay(175)因爲我們在1秒延遲的限制,我們將發出不到10個項目批次。

[ 'R0', 'R1', 'R2', 'R3', 'R4' ] 
[ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ] 
[ 'R11', 'R12', 'R13', 'R14', 'R15' ] 
[ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ] 
[ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ] 
[ 'R28', 'R29', 'R30', 'R31', 'R32' ] 
[ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ] 
[ 'R39', 'R40', 'R41', 'R42', 'R43' ] 
[ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ] 
[ 'R50', 'R51', 'R52', 'R53', 'R54' ] 

然而,您可能需要什麼不同。由於.bufferTime(1000, ...)delay(1000),此解決方案最初會在2秒延遲後開始發射值。所有其他排放發生在1秒後。

你可能最終使用:

.bufferTime(1000, null, 10) 
.mergeAll() 
.bufferCount(10) 

這將始終收集10個項目,只有經過它會執行請求。這可能會更有效。