2017-08-17 49 views
1

我想合併/合併多個觀察對象當他們每個完成時執行一個最後函數。 merge運營商似乎並行執行每個訂閱,這是我需要的,但如果他們中的任何一個拋出一個錯誤執行被暫停。rxjs5合併和錯誤處理

RxJS 4版有一個運營商mergeDelayError應該繼續執行,直到所有的人都完成了所有訂閱,但該運營商不版本5實現。

我應該回到不同的操作員嗎?

var source1 = Rx.Observable.of(1,2,3).delay(3000); 
var source2 = Rx.Observable.throw(new Error('woops')); 
var source3 = Rx.Observable.of(4,5,6).delay(1000); 

// Combine the 3 sources into 1 

var source = Rx.Observable 
    .merge(source1, source2, source3) 
    .finally(() => { 

    // finally is executed before all 
    // subscriptions are completed. 

    console.log('finally'); 

    }); 

var subscription = source.subscribe(
    x => console.log('next:', x), 
    e => console.log('error:', e), 
() => console.log('completed')); 

JSBin

回答

2

我想你可以通過使用catch()模擬相同的行爲。您只需將其追加到每一個源可觀察:

const sources = [source1, source2, source3].map(obs => 
    obs.catch(() => Observable.empty()) 
); 

Rx.Observable 
    .merge(sources) 
    .finally(...) 
    ... 
+0

謝謝 - 'catch'似乎不起作用。相反,我只是用'onErrorResumeNext'映射了所有東西,這在我的情況下是可以的。 – null

+0

@null catch()運算符不起作用如何?我認爲它應該工作... – martin

+0

我不知道爲什麼。看到這個JSBin:[http://jsbin.com/qaluyaq/edit?js,console](http://jsbin.com/qaluyaq/edit?js,console) – null

1

如果你不想吞嚥你的錯誤,但希望他們推遲到了最後,你可以:

const mergeDelayErrors = []; 
const sources = [source1, source2, source3].map(obs => obs.catch((error) => { 
    mergeDelayErrors.push(error); 
    return Rx.Observable.empty(); 
})); 

return Rx.Observable 
    .merge(...sources) 
    .toArray() 
    .flatMap(allEmissions => { 
    let spreadObs = Rx.Observable.of(...allEmissions); 
    if (mergeDelayErrors.length) { 
     spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors)); 
    } 
    return spreadObs; 
    }) 

您可能只想拋出第一個錯誤,或創建一個CompositeError。我不確定mergeDelayErrors最初在引發多個錯誤時的表現。

不幸的是,因爲這實現必須等待,直到發射前的錯誤完成所有觀測,它也一直等待,直到下一次發射前完成所有觀測。這很可能不是mergeDelayError的原始行爲,它應該以流的形式發出,而不是在最後發出。

+0

謝謝你,如你所願 –

0

我們可以避免通過收集錯誤並在端部發射它們阻塞流。

function mergeDelayError(...sources) { 
    const errors = []; 
    const catching = sources.map(obs => obs.catch(e => { 
    errors.push(e); 
    return Rx.Observable.empty(); 
    })); 
    return Rx.Observable 
    .merge(...catching) 
    .concat(Rx.Observable.defer(
    () => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors))); 
} 


const source1 = Rx.Observable.of(1,2,3); 
const source2 = Rx.Observable.throw(new Error('woops')); 
const source3 = Rx.Observable.of(4,5,6); 

mergeDelayError(source1, source2, source3).subscribe(
    x => console.log('next:', x), 
    e => console.log('error:', e), 
() => console.log('completed'));