2016-03-17 75 views
0

我現在發現RxJS,並且我的第一次嘗試之一是嘗試實現API請求的費率限制。利用RxJS限制費率

不知何故,我失去了一些東西,並在輸出只是「未定義」。

我在做什麼錯了?

const Rx = require('rx'); 
const request = require('request'); 

function f() { 
    return Rx.Observable.from(arguments); 
} 

function expand(condensedId) { 
    console.log('requesting', condensedId) 
    return f(request(INDEX_URL + '/' + condensedId)); 
} 

const INDEX_URL = 'http://jsonplaceholder.typicode.com/posts'; 

var source = f([1,2,3,4,5,6,7]) 
    .windowWithTimeOrCount(5000, 2)//rate limitation, 2 every 5 seconds 
    .flatMap(condensed => expand(condensed)) 
    .map(entry => entry.title); 

var subscription = source.subscribe(
    function (x) { 
    console.log('title: %s', x); 
    }, 
    function (err) { 
    console.log('Error: %s', err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 

回答

0

Rx.Observable.from預計可迭代的,我不認爲request()的響應是一個可迭代。您可以將返回Promise或Observable的函數傳遞給flatMap,並且它將返回一個將釋放已解析數據的流。

因此,我們使用request-promise而不是request,並在expand函數中返回Promise。此外,我們使用cheerio庫來提取html標題:

const Rx = require('rx'); 
const request = require('request-promise'); 

// HTML parsing library 
const cheerio = require('cheerio'); 

function f() { 
    return Rx.Observable.from(arguments); 
} 

const INDEX_URL = 'http://jsonplaceholder.typicode.com/posts'; 

// Return an Observable of resolved responses 
function expand(condensedId$) { 
    return condensedId$.flatMap(id => request(INDEX_URL + '/' + id)); 
} 

var source = f([1,2,3,4,5,6,7]) 
    .windowWithTimeOrCount(5000, 2)//rate limitation, 2 every 5 seconds 
    .flatMap(condensed => expand(condensed)) 
    .map(body => { 
    const $ = cheerio.load(body); 
    return $('title').text(); 
    }); 
+0

謝謝您的回覆!然而,condensedId是一個可觀察值,而不是數組中的原始值,因此所有請求都會進入404 – maephisto

+0

您可以給出以下一個鏡頭嗎?我更新了擴展函數以接受Observable作爲參數。 –