2017-03-26 25 views
2

TL; DR - 我正在尋找一種方法來控制在使用RxJS時同時連接到REST API的HTTP請求的數量。有沒有辦法用RxJS來管理併發?

我的Node.js應用程序將對第三方提供商進行幾千次REST API調用。但是,我知道如果我一次完成所有這些請求,由於DDoS攻擊,服務可能會停止或拒絕我的請求。所以,我想在任何給定時間設置最大併發連接數。我曾經通過使用Throat Package來實現Promises的併發控制,但是我還沒有找到類似的方法來實現它。

我試圖使用merge與1同時發生在此帖子How to limit the concurrency of flatMap?中,但所有請求都一次發送。

這裏是我的代碼:

var Rx = require('rx'), 
    rp = require('request-promise'); 

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3' 
]; 

var source = Rx.Observable.fromArray(array).map(httpGet).merge(1); 

function httpGet(url) { 
    return rp.get(url); 
} 

var results = []; 
var subscription = source.subscribe(
    function (x) { 
    console.log('=====', x, '======'); 
    }, 
    function (err) { 
    console.log('Error: ' + err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 
+2

請參閱http://stackoverflow.com/documentation/rxjs/8247/common-recipes/27973/sending-multiple-parallel-http-requests#t=201703261009146257815 – martin

回答

1

感謝以上的答覆。我的問題與使用rx而不是rxjs NPM模塊有關。在我卸載了rx並安裝了rxjs之後,所有示例都開始按預期使用併發。因此,具有Promises,Callbacks和Native Observables的http併發調用正常工作。

我在這裏發佈它們以防萬一任何人遇到類似問題並且可以排除故障。

基於HTTP的回調索取樣品:

var Rx = require('rxjs'), 
    request = require('request'), 
    request_rx = Rx.Observable.bindCallback(request.get); 

var array = [ 
    'https://httpbin.org/ip', 
    'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3' 
]; 

var source = Rx.Observable.from(array).mergeMap(httpGet, 1); 

function httpGet(url) { 
    return request_rx(url); 
} 

var subscription = source.subscribe(
    function (x, body) { 
    console.log('=====', x[1].body, '======'); 
    }, 
    function (err) { 
    console.log('Error: ' + err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 

答應爲基礎的樣品:

var Rx = require('rxjs'), 
    rp = require('request-promise'); 

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3' 
]; 

var source = Rx.Observable.from(array).mergeMap(httpGet, 1); 

function httpGet(url) { 
    return rp.get(url); 
} 

var results = []; 
var subscription = source.subscribe(
    function (x) { 
    console.log('=====', x, '======'); 
    }, 
    function (err) { 
    console.log('Error: ' + err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 

本地RxJS樣品:

var Rx = require('rxjs'), 
    superagent = require('superagent'), 
    Observable = require('rxjs').Observable; 

var array = [ 
    'https://httpbin.org/ip', 
    'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/10', 
    'https://httpbin.org/delay/2', 
    'https://httpbin.org/delay/2', 
    'https://httpbin.org/delay/1', 
]; 

let start = (new Date()).getTime(); 

var source = Rx.Observable.from(array) 
    .mergeMap(httpGet, null, 1) 
    .timestamp() 
    .map(stamp => [stamp.timestamp - start, stamp.value]); 

function httpGet(apiUrl) { 
    return Observable.create((observer) => { 
    superagent 
     .get(apiUrl) 
     .end((err, res) => { 
      if (err) { 
       return observer.onError(err); 
      } 
      let data, 
       inspiration; 
      data = JSON.parse(res.text); 
      inspiration = data; 
      observer.next(inspiration); 
      observer.complete(); 
     }); 
    }); 
} 

var subscription = source.subscribe(
    function (x) { 
    console.log('=====', x, '======'); 
    }); 
1

可以使用mergeMap操作來執行HTTP請求並平整響應爲合成觀測。 mergeMap有一個可選concurrent參數,使用它可以指定同時訂閱可觀測量的最大數量(即,HTTP請求):

let source = Rx.Observable 
    .fromArray(array) 
    .mergeMap(httpGet, 1); 

注意,對於指定爲1concurrent一個mergeMap相當於concatMap

您問題中的代碼一次發送所有請求的原因歸結於map運算符中的httpGet函數的調用。 httpGet返回一個承諾,並承諾不懶 - 只要httpGet被調用,該請求將被髮送。

與上面的代碼中,httpGet只會在mergeMap執行,如果有超過併發請求指定數量較少的調用。

上面的代碼將分別從組成的observable發出每個響應。如果你想組合成當所有請求已經完成時發射陣列響應,你可以使用toArray操作:

let source = Rx.Observable 
    .fromArray(array) 
    .mergeMap(httpGet, 1) 
    .toArray(); 

你也應該看看馬丁在他的評論中已經引用的食譜。

+0

謝謝。你的回答給了我一些提示,弄清楚我的樣本需要一些調整。還有那個rx!= rxjs。 rx似乎過時了,併發性不起作用。檢查上面的例子。 – Diego

2

Rx.Observable.fromPromise可能對您有幫助。擴大對cartant的回答,試試這個,在concurrent被指定爲1

Rx.Observable.from(array) 
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1) 
    .subscribe(x => console.log(x)) 

對於基於時間的控制,這是我能想到的:

Rx.Observable.from(array) 
    .bufferCount(2) 
    .zip(Rx.Observable.timer(0, 1000), x => x) 
    .mergeMap(x => Rx.Observable.from(x) 
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url))) 
    .subscribe(x => console.log(x)) 
相關問題