2017-03-16 55 views
1

我期待合併流和刷新所有用戶。RxJs合併流

對於函數式編程而言,concat不會更新當前的observable,我不明白正確的方法。

這我怎麼想的事情發生:

var observable = Rx.Observable.from(['stream1']); 
var subscription = observable.subscribe(
    function(x) { 
     console.log('Next: ' + x); 
    }, 
    function(err) { 
     console.log('Error: ' + err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 
observable.concat(Rx.Observable.from(['stream2'])); 

此代碼僅讀取第一個數據流,然後去完成。 當concat它創建一個新的Observable,我真的不想要,因爲我已經訂閱了第一個。

什麼是正確的做法,因爲我甚至無法推入第一個可觀察的?

謝謝!

回答

2

你所要做的是不完全可能這樣,但你基本上有兩個選擇去,既涉及Subject的用法:

1)數據的人工發射

const obs$ = Rx.Observable.of("stream1"); 
const subj$ = new Rx.Subject(); 

Rx.Observable.merge(obj$, subj$) 
    .subscribe(
     x => console.log('Next: ' + x), 
     x => console.log('Error: ' + x), 
     () => console.log('Complete') 
    ); 

subj$.next("stream2"); 
subj$.next("stream3"); 

但是:在這種情況下,完整將永遠不會被調用,因爲Subject永遠不會自行完成 - 所以如果您需要觸發您的complete處理程序,則必須添加手冊subj$.complete();到最後。


2)多播通過主題

const obs$ = Rx.Observable.of("stream1"); 
const subj$ = new Rx.Subject(); 

subj$.subscribe(
    x => console.log('Next: ' + x), 
    x => console.log('Error: ' + x), 
    () => console.log('Complete') 
); 

obs$.subscribe(x => subj$.next(x)); 
const obs2$ = Rx.Observable.of("stream2"); 
obs2$.subscribe(x => subj$.next(x)); 

在這種情況下的Subject將基本上充當「代理」只會傳播的數據,但沒有無差錯或完全的觸發器。

這兩種解決方案都不是很「好」 - 但也許你可以更好地概述你的用例,我相信有一個適當的解決方案,它不涉及任何複雜的解決方法。


如果你只是想有一種方式來繼續提供數據形成永久觀察的,你應該使用BehaviorSubject - 它工作的方式,你可以在它發出的數據並訂閱它在同時:

class Service { 
    public data$ = new BehaviorSubject(someInitialDataOrNull); 

    public getData() { 
     makeSomeHttpCall() 
      .subscribe(data => data$.next(data)); 
    } 
} 

class Component { 
    constructor() { 
     theService.data$.subscribe(data => console.log(data)); 
    } 
} 

這裏是對BehaviorSubject的舊文檔的鏈接(它基本上還是以同樣的方式期待onNextnext現在,等...)

+0

非常感謝您的回答。我的想法是,服務將提供來自api或其他來源的數據的可觀察性。我的其他組件正在訂閱它,因此它需要從一開始就可以觀察到,並且數據可用時會饋送其他組件。這個目的的正確模式是什麼? – LeniM

+0

我已經更新了答案 – olsn

+0

非常感謝。你釘了它! – LeniM