2016-03-09 19 views
2

此時,只有當所有壓縮的observable產生一個值時,zip纔會產生一個值。例如。從文檔:zip的替代方案,只要有任何observable發出值,就會產生值

通過使用選擇器功能每當 觀察序列的所有已經生產出元件

我正在尋找用於合併指定的觀察序列或者承諾爲一個可觀察到的 序列對於可以將可觀察值壓縮的觀察值,但會產生壓縮觀察值序列的陣列,其中如果全部產生值都沒有關係。

例如可以說我有打勾$,observ1,observ2 ..嘀$總是產生價值每x秒..而observ1和observ2僅從時間產生時間.. 我期待我流的樣子

[tick, undefined, observ2Res], 
[tick, undefined, undefined], 
[tick, observ1Res, observ2Res] 
... 
... 

它並不是最新的,因爲最新結合了一個給定觀察值的最新值。

回答

2

我相信buffer(或可能是sample)可能會讓你走上正軌。 buffer方法接受用於定義緩衝區邊界的Observable。得到的流發出了發出在該窗口中(例如,從RXJS文檔被盜buffer)的任何項目:

var source = Rx.Observable.timer(0, 50) 
    .buffer(function() { return Rx.Observable.timer(125); }) 
    .take(3); 

var subscription = source.subscribe(x => console.log('Next: ', x)); 

// => Next: 0,1,2 
// => Next: 3,4,5 
// => Next: 6,7 

所以我們現在有一個辦法讓所有流的發射事件在一定時間窗口。在你的情況,我們可以使用tick$來形容我們的採樣週期和observ1observ2是我們希望緩衝我們的基本流:

const buffered1 = observ1.buffer(tick$); 
const buffered2 = observ2.buffer(tick$); 

每個流會發出一次滴答$時期,會散發出來自底層流的所有排放項目清單(在此期間)。該buffered流將發出的數據是這樣的:

|--[]--[]--[1, 2, 3]--[]--> 

爲了得到你想要的輸出,我們可以選擇只查看每一個緩衝的結果的最新發出的項目,如果沒有發出的數據,我們可以通過空:

我說明
const buffered1 = observ1.buffer($tick).map(latest); 
const buffered2 = observ2.buffer($tick).map(latest); 

function latest(x) { 
    return x.length === 0 ? null : x[x.length - 1]; 
} 

以前的樣品流現在這個樣子:

|--null--null--3--null--> 

最後,我們可以zip這兩個流來獲得「最新」我們的tick$間隔期間發射數據:

const sampled$ = buffered1.zip(buffered2); 

sampled$流將發射從我們的observ1observ2流的最新數據在tick$窗口。這裏有一個示例結果:

|--[null, null]--[null, 1]--[1, 2]--> 
+0

卡爾文,試過這個,我正在瞄準。我不幸仍然可以upvote,但標記爲答案。 –

+0

很高興聽到它! –

相關問題