2017-04-26 60 views
1

嗨我有下面的代碼,我想知道如何防止主(上游)Observable從引發錯誤時被刪除。RxJS如何忽略錯誤與趕上並繼續

如何更改以下代碼以使所有數字都能顯示'4'?

我正在尋找一種通用模式解決方案,可以在其他情況下使用不同的操作員。這是我能想到的最簡單的情況。

const Rx = require('rxjs/Rx'); 

function checkValue(n) { 
    if(n === 4) { 
    throw new Error("Bad value"); 
    } 
    return true; 
} 
const source = Rx.Observable.interval(100).take(10); 

source.filter(x => checkValue(x)) 
    .catch(err => Rx.Observable.empty()) 
    .subscribe(v => console.log(v)); 

回答

0

您需要使用flatMap操作符來執行過濾。在這個例子中的flatMap中,我使用Observable.if()來進行過濾,因爲它保證了我一直都在返回observables。我相信你可以用其他方式做到這一點,但這對我來說是一個乾淨的實施。

const source = Rx.Observable.interval(100).take(10).flatMap((x)=> 
    Rx.Observable.if(() => x !== 4, 
    Rx.Observable.of(x), 
    Rx.Observable.throw("Bad value")) 
    .catch((err) => { 
     return Rx.Observable.empty() 
    }) 
); 

source.subscribe(v => console.log(v)); 
+0

'Observable.if()'似乎已經在v5中被刪除了。 – N13

3

你會想保留源觀察的運行,但如果你讓主事件發生錯誤流將摺疊整個觀察到的,你就不會再收到物品。

解決方案涉及創建一個分離的流,您可以過濾和捕獲,而不會讓上游管道崩潰。

const Rx = require('rxjs/Rx'); 
function checkValue(n) { 
    if(n === 4) { 
    throw new Error("Bad value"); 
    } 
    return true; 
} 
const source = Rx.Observable.interval(100).take(10); 

source 
    // pass the item into the projection function of the switchMap operator 
    .switchMap(x => { 
    // we create a new stream of just one item 
    // this stream is created for every item emitted by the source observable 
    return Observable.of(x) 
     // now we run the filter 
     .filter(checkValue) 
     // we catch the error here within the projection function 
     // on error this upstream pipe will collapse, but that is ok because it starts within this function and will not effect the source 
     // the downstream operators will never see the error so they will also not be effect 
     .catch(err => Rx.Observable.empty()); 
    }) 
    .subscribe(v => console.log(v)); 

你也可以使用傳遞到抓選擇重啓觀察到的源第二個參數,但是這將啓動它,就好像它沒有之前運行。

const Rx = require('rxjs/Rx'); 

function checkValue(n) { 
    if(n === 4) { 
    throw new Error("Bad value"); 
    } 
    return true; 
} 
const source = Rx.Observable.interval(100).take(10); 

source.filter(x => checkValue(x)) 
    .catch((err, source) => source) 
    .subscribe(v => console.log(v)); 

但是,這並沒有達到預期的效果。您將看到一個重複發射1..3的流,直到時間結束......或者關閉腳本。以先到者爲準。 (這是.retry()所必需的)

+0

謝謝!我幾乎想出了相同的解決方案,但是我最終只使用了flatMap,因爲它讀得更好,但與switchMap的功能相同。第二個解決方案是通過一個參數來捕捉,對我來說是新的,感謝分享它。 – devguy

+0

flatMap是mergeMap的別名。我建議切換到mergeMap或switchMap,因爲它在v5中更具慣用的Rx。 flatMap是一個遺產保留。 –