警告:RxJS newb在這裏。RxJS:如何將多個嵌套的觀測數據與緩衝區結合使用
這裏是我的挑戰:
- 當
onUnlink$
觀察到發射... - 立即開始從
onAdd$
觀察到的捕獲值,最多1秒(我會打電話給此分區onAddBuffer$
) 。 - 查詢數據庫(創建
doc$
觀察到的)來獲取我們將使用來匹配的onAdd$
值之一 - 如果從
onAddBuffer$
觀察到的一個值的doc$
值相匹配的模式,不排放 - 如果沒有值從
onAddBuffer$
觀察到的doc$
值匹配,或者如果onAddBuffer$
觀察到從來沒有發出,發出doc$
值
這是我最好的猜測:
// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue }))
const onAddBuffer$ = onAdd$
.buffer(doc$) // capture events while fetching from db -- not sure about this
.takeUntil(Rx.Observable.timer(1000));
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap(doc =>
Rx.Observable.race(
onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()),
Rx.Observable.timer(1000).mapTo(doc)
)
);
});
docsToRemove$.subscribe(doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
這總是會發出EmptyObservable
。也許這是因爲single
在沒有匹配時似乎排出undefined
,並且我預計它在沒有匹配時根本不會發出? find
發生同樣的事情。
如果我將single
更改爲filter
,則什麼都不發出。
FYI:這是文件系統事件的重命名方案 - 如果add
事件的unlink
事件的1秒鐘之內,接着給發出文件哈希匹配,什麼也不做,因爲它是一個rename
。否則,它是一個真正的unlink
,它應該發出要刪除的數據庫文檔。
這聽起來像你在這裏構建了一個非常討厭的競賽條件。超時通常不是解決這個問題的好方法 - 如果事情由於某種原因需要更長的時間,則會丟失數據。 –
是的,這裏肯定有潛在的競爭條件。它最終可能會挫敗這種做法。但它似乎是學習rxjs的好機會。 – glortho