2016-02-01 54 views
19

對於我來說,它不是100%清楚如何RxJs 5 share()操作員工作,請參閱這裏latest docs。 Jsbin的問題hereRxJs 5 share()運算符是如何工作的?

如果我創建一個可觀察到的一系列的0至2,一秒分隔每個值:

var source = Rx.Observable.interval(1000) 
.take(5) 
.do(function (x) { 
    console.log('some side effect'); 
}); 

如果我創建了兩個訂戶此觀察到:

source.subscribe((n) => console.log("subscriptor 1 = " + n)); 
source.subscribe((n) => console.log("subscriptor 2 = " + n)); 

我得到這在控制檯中:

"some side effect ..." 
"subscriptor 1 = 0" 
"some side effect ..." 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"some side effect ..." 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"some side effect ..." 
"subscriptor 2 = 2" 

我以爲每個訂閱都會訂閱相同的Observable,但似乎並非如此!它就像訂閱行爲創建一個完全獨立的Observable!

但如果share()經營者添加到源觀察到:

var source = Rx.Observable.interval(1000) 
.take(3) 
.do(function (x) { 
    console.log('some side effect ...'); 
}) 
.share(); 

然後我們得到這個:

"some side effect ..." 
"subscriptor 1 = 0" 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"subscriptor 2 = 2" 

這是我所期望的沒有share()

這是怎麼回事,share()運營商是如何工作的?每個訂閱是否創建一個新的Observable鏈?

回答

15

要小心您使用RxJS v5,而您的文檔鏈接似乎是RxJS v4。我不記得具體細節,但我認爲share運營商經歷了一些變化,特別是當涉及到完成和重新訂閱時,但不要聽我的話。

回到您的問題,正如您在研究中所顯示的那樣,您的期望與庫設計不符。觀察者懶洋洋地實例化他們的數據流,當用戶訂閱時具體地啓動數據流。當第二個訂閱者訂閱相同的可觀察數據時,另一個新的數據流就會開始,就好像它是第一個訂閱者(所以是的,每個訂閱都會創建一個新的觀察鏈)。這就是RxJS術語中創建的一個冷觀測值,這是RxJS可觀測值的默認行爲。如果你想要一個observable將數據發送給數據到達的用戶,那麼這是一個熱門的可觀察事物,而獲得熱門可觀察數據的一種方法是使用share運算符。

您可以在這裏找到說明的訂閱和數據流:Hot and Cold observables : are there 'hot' and 'cold' operators?(這對於RxJS v4有效,但大部分對v5有效)。

10

份額使得可觀察到的 「熱」,如果這2個條件得到滿足:

  1. 訂戶> 0
  2. 和可觀察到的還沒有完成

Scenario1的數目:訂戶數> 0並且在新訂閱之前未觀察到可觀察

var shared = rx.Observable.interval(5000).take(2).share(); 
var startTime = Date.now(); 
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 3000); 

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds 
// another emission for both observers at: startTime + 10 seconds 

情景2:在訂閱新訂閱之前,訂閱者數量爲零。成爲「冷」

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer1.unsubscribe(); 
}, 1000); 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time 
}, 3000); 
// observer2's onNext is called at startTime + 8 seconds 
// observer2's onNext is called at startTime + 13 seconds 

情況3:何時可觀察是否在新訂閱前完成。變得「冷」

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
     console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
    }; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 12000); 

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs 
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs