2017-07-18 145 views
3

假設我正在開發一個聊天應用程序。我觀察到的threads$發出每ň秒線程的陣列,觀察到offline$當一個線程下線成爲了通知,可觀察online$當一個線程成爲網上通知:組合/合併可觀察對象

enum ConnectionStatus { Offline = 0, Online } 

interface Thread { 
    id: string; 
    status: ConnectionStatus 
} 

const threads$ = Observable 
    .interval(n) 
    .switchMap(() => Observable.create((observer: Observer<Array<Thread>>) => 
     getThreads((threads: Array<Thread>) => observer.next(threads)))); 

const online$ = Observable.create((observer: Observer<Thread>) => 
    onOnline((threadId: string) => observer.next({ 
     id: threadId, 
     status: ConnectionStatus.Online 
    }))); 

const offline$ = Observable.create((observer: Observer<Thread>) => 
    onOffline((threadId: string) => observer.next({ 
     id: threadId, 
     status: ConnectionStatus.Offline 
    }))); 

我想下面這這些流合併規則threads$應該發射陣列每n秒,但每當online$offline$發出,我想抓住的threads$最新值(Array<Threads>),並通過改變一個線程的狀態圖,併發出立即映射集合。

我已經失去了跟蹤,其中Rx的combineLatestmergeMapzip和類似的,所以我會很感激,如果有人可以幫助我實現在這種情況下合併(更多的RX-方式)

回答

0

這應該發出Array<Thread>每次threads$online$offline$發出時立即發出

const threadUpdate$ = Observable.merge(
    threads$, 
    Observable.merge(online$, offline$) 
     .withLatestFrom(threads$, 
      (thread, threads) => threads.map(t => { 
       if(t.id === thread.id) { 
        t.status = thread.status 
       } 
      }))); 

注意threads$將繼續排放,甚至可能在發射,有可能,同時由於合併online$/offline$流。

1

我想你可以把它像這樣使用multicast()

const stop$ = Observable.merge(online$, offline$); 
threads$ 
    .multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...))) 
    .subscribe(...); 

顯然我沒有測試它,但也許它會把你正確的方向。