2016-08-07 47 views
2

在AnuglarJS 2應用程序中,我想使鏈式promise中的Observable。由於承諾提供一次性結果,所以第一步是使用Observable.fromPromise(),然後使用mapAll()上的元可觀察來處理每個fromPromisecomplete。我在這裏找到有用的這太問題RxJS: How to have one Observer process multiple Observables?RxJS的執行順序可以從鏈接的promise中查看

從以上問題接受的答案解決簡單的事件,我已經準備很容易代替Observable.fromEvent(someEvent)使用Observable.fromPromise(someComposedPromise)自己的解決方案。不幸的是,雖然所有的工作對於簡單的單一承諾都很好,但問題出現在承諾由兩個承諾組成,因爲承諾的解決順序。

爲了簡化和病例隔離的緣故讓我們假設我們有一些現有的外部DumbCache(和什麼我想用的是Ionic 2 LocalStorage其中最簡單的變體看起來與此類似):

class DumbCache { 
    cache = {}; 

    get(key) { 
    return new Promise((resolve, reject) => { 
     var value = this.cache[key]; 
     resolve(value); 
    }); 
    } 

    set(key, value) { 
    return new Promise((resolve, reject) => { 
     this.cache[key] = value; 
     resolve(); 
    }); 
    } 
} 

然後上述的處理方法是:

class CacheValueObservable { 
    private cache: DumbCache; 

    constructor(private key: string) { 
    this.cache = new DumbCache(); 
    } 

    /* 
    * meta observer to handle promises from observables with all results and errors 
    * thanks to ReplaySubject(1) current value is available immediately after subscribing 
    */ 
    private _valueSource$$ = new Rx.ReplaySubject(1); 
    private _value$ = this._valueSource$$.mergeAll(); 

    public value$() { return this._value$; } 

    public updateValue(value) { 
    this._valueSource$$.next(
     Rx.Observable.fromPromise(
     this.cache.set(this.key, value) 
     .then(() => this.cache.get(this.key)) 
    ) 
    ); 
    } 
} 

現在爲以下代碼:

let cacheValueObservable = new CacheValueObservable("TEST_KEY"); 
cacheValueObservable.updateValue('VALUE 0'); 

cacheValueObservable.value$().subscribe(
    val => { 
     console.log('val:' + val); 
    }, 
    val => console.log('err', val.stack), 
    () => (console.log('complete')) 
); 

cacheValueObservable.updateValue('VALUE 1'); 
cacheValueObservable.updateValue('VALUE 2'); 

console.log('end'); 

結果是:

starting... 
end 
val:VALUE 2 
val:VALUE 2 
val:VALUE 2 

而很明顯,我想實現

starting... 
end 
val:VALUE 0 
val:VALUE 1 
val:VALUE 2 

完整這裏例如:http://jsbin.com/wiheki/edit?js,console

回答

1

雖然試圖表達的問題很好地描述我仍然在更好和更好地調查和理解這個問題。重點是第一個承諾this.cache.set(this.key, value)可能實際上立即解決,而Observable.fromPromise不能保證以下順序的所有鏈接承諾正在解決。

這個問題是由於每個鏈的最後一個承諾只有在最後一個鏈的第一個承諾(因此VALUE 2)被更改後才執行的事實造成的。

後所有的解決方案看起來從視圖代碼點很簡單,但它是不是很明顯是由兩個主要變化:

  • 推遲最初的承諾執行,直到mergeAll階段將使用Observable.defer代替Observable.fromPromise
  • 限制(或實際禁用)使用mergeAll(1)

因而工作液合併的承諾的併發看起來是這樣的:

class CacheValueObservable { 
    private cache: DumbCache; 

    constructor(private key: string) { 
    this.cache = new DumbCache(); 
    } 

    /* 
    * meta observer to handle promises from observables with all results and errors 
    * thanks to ReplaySubject(1) current value is available immediately after subscribing 
    */ 
    private _valueSource$$ = new Rx.ReplaySubject(1); 
    // disable merge concurrency 
    private _value$ = this._valueSource$$.mergeAll(1); 

    public value$() { return this._value$; } 

    public updateValue(value) { 
    this._valueSource$$.next(
     // defer promise resolution to ensure they will be fully resolved 
     // one by one thanks to no concurrency in mergeAll 
     Rx.Observable.defer(() => 
     this.cache.set(this.key, value) 
     .then(() => this.cache.get(this.key)) 
    ) 
    ); 
    } 
} 

這裏就是活生生的例子: http://jsbin.com/bikawo/edit?html,js,console