2016-06-09 16 views
2

我正在學習RxJS。我有關於下面的代碼片段的問題。RxJS將新項目添加到陣列流中未發佈到訂戶

var arr = [1,2,3,4,5]; 
var arraysource = Observable.from(arr); 

arr.push(6); 
var subscription = arraysource.subscribe(
    x => console.log('onNext: %s', x), 
    e => console.log('onError: %s', e), 
    () => console.log('onCompleted')); 

arr.push(7); 

當我運行上面的代碼時,我得到以下輸出。

onNext: 1 
onNext: 2 
onNext: 3 
onNext: 4 
onNext: 5 
onNext: 6 
onCompleted 

我的問題是爲什麼第七元素沒有得到公佈,訂閱後添加?是因爲輸入流是冷流並且它的讀取項目是同步的嗎?因此,添加post onComplete火災永遠不會到達觀察者?有人可以對此行爲有所瞭解嗎?

回答

2

你可以創建自己的可觀測得到你正在尋找

的Rx 5測試版的功能(改變nextcompleteonNextonCompleted Rx之中4)

var source = Rx.Observable.create(function (observer) { 
    [1,2,3,4,5].forEach(item => observer.next(item)); 

    observer.next(6); 
    // observer.complete() // <-- remove comment to allow observable to complete 
    // Any cleanup logic might go here 
    return function() { 
     console.log('disposed'); 
    }; 
}); 


var subscription = source.subscribe(
    function (x) { console.log('onNext: %s', x); }, 
    function (e) { console.log('onError: %s', e); }, 
    function() { console.log('onCompleted'); } 
); 

例如http://jsbin.com/datuqoniyo/edit?js,console

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#creating-a-sequence-from-scratch

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create

2

arraySource是一個冷源,因此當您訂閱它時懶惰地實例化它的值序列。那一刻,你的數組有6個值。是的,因爲Rx.Observable.from(array)會同步生成其值,所以在可觀察結果完成後您將看到所有6個值。

這就是說,在一個真實的代碼中,改變一個介於observable定義的參數並不是一個好的實踐。它使得你的程序非常難以推理,因爲這種變異的影響將取決於大量事物(這裏的同步性,可觀察操作符的實現等)。如果你有一個隨着時間的推移而改變價值的論證,那麼你在概念上就有一個可觀察的事物。所以你可以將它建模爲一個可觀察對象,以便於操縱和推理。