2014-02-09 69 views

回答

20

對於您的具體例,想法是,從陣列中的每個值映射到可觀察到的一個延遲之後,將產生其結果,則級聯所得到的觀測值的流:

var delayedStream = Rx.Observable 
    .fromArray([1, 2, 3, 4, 5]) 
    .map(function (value) { return Rx.Observable.return(value).delay(2000); }) 
    .concatAll(); 

其它實例可能確實利用timerinterval。這取決於。例如,如果你的數組真的非常大,那麼上面的內存會造成相當大的內存壓力(因爲它創建了一個非常大的N觀測值)。下面是一個使用interval到懶惰地行走陣列的替代:

var delayedStream = Rx.Observable 
    .interval(2000) 
    .take(reallyBigArray.length) // end the observable after it pulses N times 
    .map(function (i) { return reallyBigArray[i]; }); 

這一個將來自陣列每2秒,直到它已經重複了在整個陣列上產生的下一個值。

6

雖然布蘭登的回答得到了主意,但這是一個立即生成第一個項目的版本,然後在以下項目之間放置時間。

var delay = Rx.Observable.empty().delay(2000); 

var items = Rx.Observable.fromArray([1,2,3,4,5]) 
    .map(function (x) { 
    return Rx.Observable.return(x).concat(delay); // put some time after the item 
    }) 
    .concatAll(); 

更新了新的RxJS:

var delay = Rx.Observable.empty().delay(2000); 

var items = Rx.Observable.fromArray([1,2,3,4,5]) 
    .concatMap(function (x) { 
    return Rx.Observable.of(x).concat(delay); // put some time after the item 
    }); 
+0

注意'現在return'是'of':https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md –

+0

現在,這可能是'concatMap'(猜測concatMap比2014年更新?)。這只是將它們全部排在一起(否則它們全部在一起延遲後) –

+0

例如。 '.concatMap(x => Observable.of(x).concat(Observable.empty()。delay(5000)))' –

17

我覺得用拉鍊生產更好,更可讀的代碼,仍然只用3觀測。

var items = ['A', 'B', 'C']; 

Rx.Observable.zip(
    Rx.Observable.fromArray(items), 
    Rx.Observable.timer(2000, 2000), 
    function(item, i) { return item;} 
) 
+0

到目前爲止最佳回答 – gropapa

4

同意zip是一種乾淨的方法。下面是可重複使用的函數來生成的間隔流爲一個數組:

function yieldByInterval(items, time) { 
    return Rx.Observable.from(items).zip(
    Rx.Observable.interval(time), 
    function(item, index) { return item; } 
); 
} 

// test 
yieldByInterval(['A', 'B', 'C'], 2000) 
    .subscribe(console.log.bind(console)); 

此基礎上farincz's answer,但是通過使用作爲.zip實例方法稍短。

此外,我使用Rx.Observable.from(),因爲Rx.Observable.fromArray()deprecated

4

對於RxJS 5:

Rx.Observable.from([1, 2, 3, 4, 5]) 
    .zip(Rx.Observable.timer(0, 2000), x => x) 
    .subscribe(x => console.log(x));