我很努力將節點流轉換爲Rxjs觀察對象。將節點流轉換爲Rx.js觀察對象
當我嘗試使用1個URL時,流式傳輸本身效果很好。但是,當我嘗試通過URLS數組映射相同的函數時,出現錯誤。
我正在使用Rx.Node將流轉換爲Observable。
這是我目前正試圖
// data_array is an array of 10 urls that I'm scraping data from.
let parentStream = Rx.Observable.from(data_array);
parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function createStream(url){
return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
}
但這是輸出×10
RefCountObservable {
source:
ConnectableObservable {
source: AnonymousObservable { source: undefined, __subscribe: [Function] },
_connection: null,
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [],
hasError: false } },
_count: 0,
_connectableSubscription: null }
我首先想到flatMap會(在data_array中的網址數)工作,因爲它是在一個可觀察的平展observables ....但是當我嘗試flatMap,我得到這個:
Complete
Error TypeError: unknown type returned
但是,如果我這樣做:
這適用於1個URL,但我不能捕獲所有的網址在data_array中的一個流。
let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'))
我覺得我誤解的東西,不僅是因爲它清除不工作的多個URL,但即使它在第二個例子中的工作....我得到「完成」之前,首先所有數據進來。
顯然,我誤解了一些東西。任何幫助將是美好的。謝謝。
* UPDATE *
我嘗試了不同的路徑,它的工作原理,但不使用節點流。節點流將是理想的,所以仍然想要使上述示例工作。
我接下來使用的方法是圍繞我的網頁抓取功能,這是刮以下承諾。這是有效的,但結果是十個巨大的數組包含每個數組中每個URL的所有數據。我真正想要的是一組對象,我可以在數據對象通過時組成一系列轉換。
這裏是不同的,但工作方式:
let parentStream = Rx.Observable.from(data_array);
parentStream.map(url => {
return Rx.Observable.defer(() => {
return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]);
})
})
.concatAll()
.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function scrape(url, selector, scope) {
return new Promise(
(resolve, reject) => x(
url,
selector,
scope
)((error, result) => error != null ? reject(error) : resolve(result))
);
}