2017-02-25 12 views
1

如果我使用的範圍和限制輸出與takeWhile:RXJS concatMap還應該懶拉?

​​

輸出爲:

range: 1 
res : 1 
range: 2 
res : 2 
range: 3 
res : 3 
range: 4 
res : 4 
range: 5 
res : 5 
range: 6 

的通過範圍產生值不都消耗。 6拉,沒有通過takeWhile,沒有更多的價值觀。現在,如果我有之間的concatMap

Rx.Observable 
    .range(1, 10) 
    .do(idx => console.log('range: ', idx)) 
    .concatMap(idx => { 
     var res = new Rx.Subject<number>(); 
     setTimeout(() => { 
      res.next(idx); 
      res.complete(); 
     }, 10); 
     return res; 
    }) 
    .takeWhile(idx => idx <= 5) 
    .subscribe(idx => console.log('res: ', idx)) 
    ; 

輸出是這樣的:

range: 1 
range: 2 
range: 3 
range: 4 
range: 5 
range: 6 
range: 7 
range: 8 
range: 9 
range: 10 
res: 1 
res: 2 
res: 3 
res: 4 
res: 5 

我所期望的,從生產範圍的值將在這裏也被限制。 concatMap保留了順序,所以當前一個observable完成時,只需拉取下一個值就可以了。但是所有的範圍錯誤都被拉了。
這是一個錯誤?或者什麼是真正的行爲。你能幫助理解嗎?

回答

2

range()產生的值被緩存在concatMap() operator的內部,然後逐一拉出。然後你使用setTimeout()異步發射值。一般來說,操作人員不會嘗試使用任何背壓,以便在準備好時從源頭排出所有物品。

請注意,即使使用Observable.of()和異步調度程序Scheduler.async,也可以實現相同。這使得來自Observable.of的排放發生在使其異步的新事件中。

const Observable = Rx.Observable; 
const Scheduler = Rx.Scheduler; 

Rx.Observable 
    .range(1, 10) 
    .do(idx => console.log('range: ', idx)) 
    .concatMap(idx => Observable.of(idx, Scheduler.async)) 
    .takeWhile(idx => idx <= 5) 
    .subscribe(idx => console.log('res: ', idx)); 

見現場演示:https://jsbin.com/xukolo/3/edit?js,console