2017-08-27 25 views
0

我的問題是遞歸的,我不知道當我的Observable complete()在遞歸函數中。RxJS:隨着時間的推移動態組合多個觀測值

基本上,我有一個功能是廢除一個帶有分頁功能的網站,它返回的是一個Observable,它將.next(data)方法在每個頁面上發現的「解析」項目遞歸地執行,直到我們發現自己在最後一頁然後我們觸發我們的主題的.complete()方法。

我使用concat(Observable)方法來遞歸地連接新的Observable和當前的Observable,但它似乎不工作,當我訂閱我的observable時,我只能得到第一頁的項目,這讓我猜測concat()方法不適用於我的情況。

這裏是我的功能代碼的簡化版本。

```

crawlList(url) { 
    let obsSubject = new Subject(); 
    request(url, function (err, response, body) { 
     //check if everything is alright... 
     //parsing data... 
     obsSubject.next(parsedData); 
     //now we check if we can still paginate (we are not in the last page) 
     //if so, we concat our observable with the new observable recursivly 
     obsSubject.concat(crawList(url)) 
     //else ,(we are in the last page 
     obsSubject.complete(); 
    }); 
    return obsSubject; 
} 

```

+0

'concat'不修改可觀察/主題,它返回一個新的。嘗試'obsSubject = obsSubject.concat(crawList(url));' – noppa

回答

2

在使用Subject小號,除非你是一定你不能跟運營商做一般避免的。

在這種情況下,我預計expand會在這裏工作。前一個流的結果反饋給運算符,以便遞歸執行。

喜歡的東西:

// Convert the callback into an Observable 
const rxRequest = Rx.Observable.bindNodeCallback(
    request, 
    (response, body) => ({response, body}) 
);  

// Feed the initial data set, by default we should continue 
Observable.of({url: baseUrl, shouldContinue: true}) 
    .expand(({url, shouldContinue}) => { 
    // Return an empty stream to cancel the Observable 
    // note this is from the *previous* iteration 
    if (!shouldContinue) 
     return Rx.Observable.empty(); 

    // This is how we call the newly created method 
    return rxRequest(url) 
     .map(({response, body}) => 
     // Parse data 
     // Check if you should continue or not 
     // We still need to emit this data so we can't cancel until the next 
     // go-around 
     ({url: newUrl, data: parsedData, shouldContinue}) 
    ); 
    }) 
    // Downstream only cares about the data part 
    .pluck('data')