2016-09-29 45 views
1

我很努力將節點流轉換爲Rxjs觀察對象。將節點流轉換爲Rx.js觀察對象

當我嘗試使用1個URL時,流式傳輸本身效果很好。但是,當我嘗試通過URLS數組映射相同的函數時,出現錯誤。

我正在使用Rx.Node將流轉換爲Observable。

這是我目前正試圖

// data_array is an array of 10 urls that I'm scraping data from. 
let parentStream = Rx.Observable.from(data_array); 

parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createStream(url){ 
    return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 
} 

但這是輸出×10

RefCountObservable { 
source: 
    ConnectableObservable { 
    source: AnonymousObservable { source: undefined, __subscribe: [Function] }, 
_connection: null, 
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] }, 
_subject: 
    Subject { 
    isDisposed: false, 
    isStopped: false, 
    observers: [], 
    hasError: false } }, 
_count: 0, 
_connectableSubscription: null } 

我首先想到flatMap會(在data_array中的網址數)工作,因爲它是在一個可觀察的平展observables ....但是當我嘗試flatMap,我得到這個:

Complete 
Error TypeError: unknown type returned 

但是,如果我這樣做:

這適用於1個URL,但我不能捕獲所有的網址在data_array中的一個流。

let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 

stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')) 

我覺得我誤解的東西,不僅是因爲它清除不工作的多個URL,但即使它在第二個例子中的工作....我得到「完成」之前,首先所有數據進來。

顯然,我誤解了一些東西。任何幫助將是美好的。謝謝。

* UPDATE *

我嘗試了不同的路徑,它的工作原理,但不使用節點流。節點流將是理想的,所以仍然想要使上述示例工作。

我接下來使用的方法是圍繞我的網頁抓取功能,這是以下承諾。這是有效的,但結果是十個巨大的數組包含每個數組中每個URL的所有數據。我真正想要的是一組對象,我可以在數據對象通過時組成一系列轉換。

這裏是不同的,但工作方式:

let parentStream = Rx.Observable.from(data_array); 

parentStream.map(url => { 
    return Rx.Observable.defer(() => { 
     return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]); 
    }) 
}) 
    .concatAll() 
    .subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function scrape(url, selector, scope) { 
    return new Promise(
     (resolve, reject) => x(
      url, 
      selector, 
      scope 
     )((error, result) => error != null ? reject(error) : resolve(result)) 
    ); 
} 

回答

1

*解決方案* 我想通了。我已附上下面的解決方案:

取而代之的是使用RxNode,我選擇使用Rx.Observable.fromEvent()。

節點流發出事件,無論是新數據,錯誤還是完成。

所以from事件靜態運算符正在偵聽'data'事件併爲每個事件創建一個新的Observable。

然後我合併所有這些,並訂閱。代碼如下:

let parentStream = Rx.Observable.from(data_array); 
parentStream.map((url)=> { return createEventStream(url); }).mergeAll().subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createEventStream(url){ 
    return Rx.Observable.fromEvent(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')), 'data'); 
}