2017-09-28 58 views
1

我正在處理某些正在記錄來自隊列的數據。將隊列處理成Observable非常容易,以便我的代碼中可以有多個端點接收隊列中的信息。包含確認的觀察模式

此外,我可以確定信息按順序到達。這個位很好地工作,因爲Observables確保這一點。 但是,一個棘手的問題是,我不希望觀察者在下一件事完成處理之前得到通知。但Observer完成的處理是異步的。

作爲一個更具體的例子,可能很簡單。想象一下我的隊列包含URL。我在我的代碼中將這些視爲一個Observable。我訂閱了一個觀察者,他的工作是獲取URL並將內容寫入磁盤(這是一個人爲的例子,所以不要對這些具體問題置疑)。重要的一點是提取和保存是異步的。我的問題是,我不希望觀察者在Observable獲得「下一個」URL之後才能完成前面的處理。

但Observer接口上的next的調用返回void。所以Observer沒有辦法讓我回到已經完成異步任務的我。

有什麼建議嗎?我懷疑有可能是某種可以編碼的操作符,它會基本上忽略未來的值(將它們排列在內存中?),直到它以某種方式知道Observer已經準備好了。但我希望按照某種既定模式已經存在這樣的事情。

回答

1

類似用途的情況下,我跑進前

window.document.onkeydown=(e)=>{ 
 
    return false 
 
} 
 
let count=0; 
 
let asyncTask=(name,time)=>{ 
 
    time=time || 2000 
 
    return Rx.Observable.create(function(obs) { 
 
     setTimeout(function() { 
 
     count++ 
 
     obs.next('task:'+name+count); 
 
      console.log('Task:',count ,' ', time, 'task complete') 
 
     obs.complete(); 
 
     }, time); 
 
    }); 
 
} 
 

 
let subject=new Rx.Subject() 
 
let queueExec$=new Rx.Subject() 
 

 

 
Rx.Observable.fromEvent(btnA, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('A',4000)) 
 
}) 
 

 
Rx.Observable.fromEvent(btnB, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('B',4000)) 
 
}) 
 

 
Rx.Observable.fromEvent(btnC, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('C',4000)) 
 
}) 
 

 
    queueExec$.concatMap(value=>value) 
 
    .subscribe(function(data) { 
 
     console.log('onNext', data); 
 
    }, 
 
    function(error) { 
 
     console.log('onError', error); 
 
    },function(){ 
 
console.log('completed') 
 
});

+0

所以,如果我明白這一點,你有'asyncTask'返回一個Observable,而不是一個值。該Observable發佈了一個值,然後關閉。但是,當你運行'concatMap'將它們連接在一起時,效果是你將不會從下一個Observable獲得值,直到第一個*完成*(當異步完成時它會這樣做)。這是一個有趣的想法。我發現了一些更適合我的案例,但這對於其他用例來說絕對是一個有趣的想法。 –

+0

有了這個模式,你可以在異步隊列中拋出任何東西,只要它是可觀察的。在某些情況下,它更加靈活 –

1

你描述的如 「反壓」 的聲音。您可以在RxJS 4文檔https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md中閱讀它。但是這裏提到RxJS 5中不存在的操作符。例如,請查看「受控觀察點」,它應該引用您需要的內容。

我想你可以實現與concatMap相同,Subject的實例:

const asyncOperationEnd = new Subject(); 

source.concatMap(val => asyncOperationEnd 
    .mapTo(void 0) 
    .startWith(val) 
    .take(2) // that's `val` and the `void 0` that ends this inner Observable 
) 
    .filter(Boolean) // Always ignore `void 0` 
    .subscribe(val => { 
    // do some async operation... 
    // call `asyncOperationEnd.next()` and let `concatMap` process another value 
    }); 

來回你的描述,它實際上似乎是「觀察員」你提的類似主題的作品所以它將使更多的可能感覺要製作一個可以在任何Observable鏈中使用的自定義Subject類。