在AnuglarJS 2應用程序中,我想使鏈式promise中的Observable。由於承諾提供一次性結果,所以第一步是使用Observable.fromPromise()
,然後使用mapAll()
上的元可觀察來處理每個fromPromise
的complete
。我在這裏找到有用的這太問題RxJS: How to have one Observer process multiple Observables?RxJS的執行順序可以從鏈接的promise中查看
從以上問題接受的答案解決簡單的事件,我已經準備很容易代替Observable.fromEvent(someEvent)
使用Observable.fromPromise(someComposedPromise)
自己的解決方案。不幸的是,雖然所有的工作對於簡單的單一承諾都很好,但問題出現在承諾由兩個承諾組成,因爲承諾的解決順序。
爲了簡化和病例隔離的緣故讓我們假設我們有一些現有的外部DumbCache
(和什麼我想用的是Ionic 2 LocalStorage
其中最簡單的變體看起來與此類似):
class DumbCache {
cache = {};
get(key) {
return new Promise((resolve, reject) => {
var value = this.cache[key];
resolve(value);
});
}
set(key, value) {
return new Promise((resolve, reject) => {
this.cache[key] = value;
resolve();
});
}
}
然後上述的處理方法是:
class CacheValueObservable {
private cache: DumbCache;
constructor(private key: string) {
this.cache = new DumbCache();
}
/*
* meta observer to handle promises from observables with all results and errors
* thanks to ReplaySubject(1) current value is available immediately after subscribing
*/
private _valueSource$$ = new Rx.ReplaySubject(1);
private _value$ = this._valueSource$$.mergeAll();
public value$() { return this._value$; }
public updateValue(value) {
this._valueSource$$.next(
Rx.Observable.fromPromise(
this.cache.set(this.key, value)
.then(() => this.cache.get(this.key))
)
);
}
}
現在爲以下代碼:
let cacheValueObservable = new CacheValueObservable("TEST_KEY");
cacheValueObservable.updateValue('VALUE 0');
cacheValueObservable.value$().subscribe(
val => {
console.log('val:' + val);
},
val => console.log('err', val.stack),
() => (console.log('complete'))
);
cacheValueObservable.updateValue('VALUE 1');
cacheValueObservable.updateValue('VALUE 2');
console.log('end');
結果是:
starting...
end
val:VALUE 2
val:VALUE 2
val:VALUE 2
而很明顯,我想實現
starting...
end
val:VALUE 0
val:VALUE 1
val:VALUE 2
完整這裏例如:http://jsbin.com/wiheki/edit?js,console