2016-10-01 73 views
1

我想使用windowCount將我的可觀察值分組到組中,併爲每個組的每個值發送請求。
然後,連接這些組,以便下一個組的請求在當前組請求未完成之前不會啓動。
問題是有些值會被跳過。

這是我的代碼。
(我沒有在這裏做實際的ajax調用,但Observable.timer應該爲一個例子)。windowCount下降值

Observable.interval(300) 
    .take(12) 
    .windowCount(3) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(Math.random() * 1500).mapTo(v) 
     ); 
    }) 
    .do(v => console.log(v)) 
    .finally(() => console.log('fin')) 
    .subscribe(); 

我試圖通過手動創建組來替換windowCount。它完美的作品。沒有值被跳過。

Observable.interval(900) 
    .take(4) 
    .map(i => Observable.interval(300).take(3).map(j => j + i * 3)) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(Math.random() * 1500).mapTo(v) 
     ); 
    }) 
    .do(v => console.log(v)) 
    .finally(() => console.log('fin')) 
    .subscribe(); 

我的印象是,windowCount應該以相同的方式對發射值進行分組。
但是,顯然它做了別的。

我會很感激任何解釋它的行爲。

謝謝!

回答

0

缺少的值是使用熱點可觀測值(Observable.interval(300))的結果,該值繼續輸出未存儲以供使用的值。

以下是您的代碼的一個稍微簡化的版本,它還會記錄發出數字的次數。我用1替換了Math.random(),以便輸出是確定性的。我也裝在jsbin的代碼,你可以試試:

https://jsbin.com/burocu/edit?js,console

Observable.interval(300) 
    .do(x => console.log(x + ") hot observable at: " + (x * 300 + 300))) 
    .take(12) 
    .windowCount(3) 
    .do(observe3 => {observe3.toArray() 
     .subscribe(x => console.log(x + " do window count at: " + (x[2] * 300 + 300)));}) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(1 * 1500).mapTo(v) 
     ) 
     .do(v => console.log(v + " merge map at: " + (v * 300 + 300 + 1500))); 
    }) 
    .finally(() => console.log('fin windowCount')) 
    .subscribe(); 

它導致下面的輸出。請注意,熱門可觀察對象繼續前進,而其他操作員仍在處理中。

這是什麼給你的印象是價值被放棄。你可以看到windowCount(3)正在做什麼你認爲,但不是當你想到

"0) hot observable at: 300" 
"1) hot observable at: 600" 
"2) hot observable at: 900" 
"0,1,2 do window count at: 900" 
"3) hot observable at: 1200" 
"4) hot observable at: 1500" 
"5) hot observable at: 1800" 
"3,4,5 do window count at: 1800" 
"0 merge map at: 1800" 
"6) hot observable at: 2100" 
"1 merge map at: 2100" 
"7) hot observable at: 2400" 
"2 merge map at: 2400" 
"8) hot observable at: 2700" 
"6,7,8 do window count at: 2700" 
"9) hot observable at: 3000" 
"10) hot observable at: 3300" 
"11) hot observable at: 3600" 
"9,10,11 do window count at: 3600" 
" do window count at: NaN" 
"8 merge map at: 4200" 
"fin windowCount" 

編輯:進一步解釋...

windowCount(3)後有來concatMap通話。 concatMapmapconcatAll的組合。

concatAll

加入每可觀察由源(高階 可觀察)發射的,以串行方式。它僅在前一個內部Observable完成(強調添加)之後訂閱每個內部 Observable ,並且 將其所有值合併到返回的observable中。

因此,看上面的輸出,我們看到第一個windowCount(3)值[0,1,2]在1800和2400之間發射。當[3,4,5]被髮射,所述第二windowCount(3)值[3,4,5]在1800 concatAll發射

通知是沒有準備好訂閱因爲前內可觀察到未完成尚未。所以這些值有效地下降了。

接下來,注意前一內可觀察到[0,1,2]在2400 concatAll訂閱完成在2400

的下一個值出現在2700的值8(300毫秒後的訂閱開始在2400)。然後,由於從訂閱開始點2400開始300的間隔延遲,然後1500的定時器延遲(即2400 + 300 + 1500 = 4200),則在4200由mergeMap輸出值8。

在這一點之後,序列就完成了,所以不會再發射任何值。

如果需要更多說明,請添加評論。