2014-09-03 31 views
1

我有一些代碼段如下Rxjs - 只有第一個觀察者可以看到從observable.share數據()

var videosNeedFix = Rx.Observable.fromArray(JSON.parse(fs.readFileSync("videoEntries.json"))).share(); 

videosNeedFix.count().subscribe(function(count){ //subscrption A 
    console.log(count + " in total"); 
}); 


videosNeedFix.subscribe(function(videoEntry){ //subscription B 
    console.log(videoEntry.id, videoEntry.name, videoEntry.customFields); 
}); 

的videoEntries.json是的VideoEntry對象的JSON序列化陣列。我期待訂閱A和訂閱B都能收到由videosNeedFix observable發出的數據。

但是,根據控制檯日誌,只有訂閱A將接收數據,但不會收到subscriptionB。如果我交換製作這兩個訂閱的順序,只有subscriptionB纔會看到這些數據。觀察數據如何僅向第一次訂閱發佈數據?

+4

在第一'訂閱( )',共享訂閱在第二個'subscribe()'被調用之前被創建和完成。所以第二個用戶只收到一個完成的事件。你可以看到,這是因爲添加延遲將使它工作...''Rx.Observable.fromArray(JSON.parse(fs.readFileSync(「videoEntries.json」)))。delay(1000).share( );' – 2014-09-03 01:05:02

+0

這是[Cold vs Hot Observable](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables)問題。或者代替共享,請使用發佈/收集。 – mtsr 2014-11-08 22:13:52

回答

0

這是一個很好的用例(也許是唯一的 - 見To Use Subject Or Not To Use Subject?)的Rx.Subject

請看下面的例子。此代碼(與.delay()黑客在評論中提到的)的工作,但似乎有點哈克對我說:

let stream$ = Rx.Observable 
     .return(updatesObj) 
     .map(obj => Object.assign({}, obj.localData, obj.updates)) 
     .delay(1) //Hacky way of making it work 
     .share() 

    stream$ 
     .flatMap(obj => Observable.fromPromise(AsyncStorage.setItem('items', JSON.stringify(obj)))) 
     .catch(Observable.return(false)) 
     .subscribe() 

     stream$ 
     .subscribe(obj => dispatch(dataIsReady(obj))) 

例與Rx.Subjects:

let subjS = new Rx.Subject() 

    let stream$ = subjS 
    .map(obj => Object.assign({}, obj.localData, obj.updates)) 
    .share() 

    stream$ 
    .flatMap(obj => Observable.fromPromise(AsyncStorage.setItem('items', JSON.stringify(obj)))) 
    .catch(Observable.return(false)) 
    .subscribe() 

    stream$ 
    .subscribe(obj => dispatch(dataIsReady(obj))) 

    subjS.onNext(updatesObj)