2017-10-10 20 views
0

我有可觀察到的myObservableconcatMap()等效但異步像mergeMap()

let myObservable = Observable.of(2000, 1000) 

隨着concatMap():TOTAL TIME = 3000毫秒時間,導致原來的順序。

myObservable.concatMap(v => Rx.Observable.of(v).delay(v)) 
// concatMap: 2000, concatMap: 1000 

隨着mergeMap():TOTAL TIME = 2000米利斯,導致無法在原來的順序。

myObservable.mergeMap(v => Rx.Observable.of(v).delay(v)) 
// mergeMap: 1000, mergeMap: 2000 

我想辦法讓原始順序的結果就像concatMap,但調用每個嵌套的異步,而不是等待下一個嵌套的觀察到的完成觀察到:

// --- The behavior that I want --- 
myObservable.myCustomMap(v => Rx.Observable.of(v).delay(v)) 
// myCustomMap: 2000, myCustomMap: 1000 
// TOTAL TIME = 2000 millis 

是否有優雅的方案

編輯:我在尋找一個解決方案,也適用如果源(myObservable)是異步的,不僅對這種特殊的情況下同步。

+0

你應該使用'forkJoin'。寫一個適當的答案。 – Maxime

+0

你是否期望源'myObservable'異步發射?我的意思是用例比'Observable.of(2000,1000)'更復雜? – martin

+0

@martin是的,用例更復雜,可能是異步 – PerrierCitror

回答

0

您應該使用forkJoin來同時觸發所有的觀測值。

這裏是沒有註釋的例子:

const { Observable } = Rx; 

const obs$ = Observable 
    .of(3000, 3000, 1000) 
    .map(x => Observable.of(x).delay(x)); 

const allObsWithDelay$ = obs$.toArray(); 

const result$ = allObsWithDelay$ 
    .switchMap(arr => Observable.forkJoin(arr)); 

result$ 
    .do(console.log) 
    .subscribe(); 

並與解釋一樣:

const { Observable } = Rx; 

// source observable, emitting simple values... 
const obs$ = Observable 
    .of(3000, 3000, 1000) 
    // ... which are wrapped into a different observable and delayed 
    .map(x => Observable.of(x).delay(x)); 

// use a reduce to build an array containing all the observables 
const allObsWithDelay$ = obs$.toArray(); 

const result$ = allObsWithDelay$ 
    // when we receive the array with all the observable 
    // (so we get one event, with an array of multiple observables) 
    .switchMap(arr => 

    // launch every observable into this array at the same time 
    Observable.forkJoin(arr) 
); 

// display the result 
result$ 
    .do(console.log) 
    .subscribe(); 

使用這些值:3000, 3000, 1000整個過程服用3秒(最大他們,因爲他們」同時重新開火)

工作普拉克爾:https://plnkr.co/edit/IRgEhdjCmZSTc6hSaVeF?p=preview

編輯1:感謝@PierreCitror您指出toArray這是優於scan :)

+0

Nice Maxime,你可以通過調用toArray()來簡化reduce。 – PerrierCitror

+0

Riiiiiiight這是我一直在尋找,但不記得,thx @PerrierCitror:D – Maxime

+0

那麼,我不確定這正是OP正在尋找的。如果'obs $'是異步的,那麼'switchMap'將使所有的Observables取消訂閱並再次訂閱。例如,當你有5個遠程調用,'obs $'發出第6個,那麼所有前5個都會重複。 – martin

1

我會做這樣的:

myObservable 
    .mergeMap((val, i) => Observable.forkJoin(
    Observable.of(i), 
    Observable.of(v).delay(v) 
)) 
    .scan((acc, ([i, result])) => { 
    acc[i] = result; 
    return acc; 
    }, {}) 
    .filter(allResults => { 
    // Whatever goes here 
    Object.keys(allResults) // list indices of all finished responses 
    }) 

這將累積在單個對象的所有響應,其中每個響應被分配了一個索引,它在mergeMap之內。

然後在filter中,您可以編寫任何您想要的邏輯,以決定當前狀態是否應該進一步傳播(例如,您可以等到某個數量的響應到達或者其他任何狀態)。