2017-12-18 258 views
4

警告:RxJS newb在這裏。RxJS:如何將多個嵌套的觀測數據與緩衝區結合使用

這裏是我的挑戰:

  1. onUnlink$觀察到發射...
  2. 立即開始從onAdd$觀察到的捕獲值,最多1秒(我會打電話給此分區onAddBuffer$) 。
  3. 查詢數據庫(創建doc$觀察到的)來獲取我們將使用來匹配的onAdd$值之一
  4. 如果從onAddBuffer$觀察到的一個值的doc$值相匹配的模式,不排放
  5. 如果沒有值從onAddBuffer$觀察到的doc$值匹配,或者如果onAddBuffer$觀察到從來沒有發出,發出doc$

這是我最好的猜測:

// for starters, concatMap doesn't seem right -- I want a whole new stream 
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => { 

    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })) 

    const onAddBuffer$ = onAdd$ 
    .buffer(doc$) // capture events while fetching from db -- not sure about this 
    .takeUntil(Rx.Observable.timer(1000)); 

    // if there is a match, emit nothing. otherwise wait 1 second and emit doc 
    return doc$.switchMap(doc => 
    Rx.Observable.race( 
     onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()), 
     Rx.Observable.timer(1000).mapTo(doc) 
    ) 
); 
}); 

docsToRemove$.subscribe(doc => { 
    // should only ever be invoked (with doc -- the doc$ value) 1 second 
    // after `onUnlink$` emits, when there are no matching `onAdd$` 
    // values within that 1 second window. 
}) 

這總是會發出EmptyObservable。也許這是因爲single在沒有匹配時似乎排出undefined,並且我預計它在沒有匹配時根本不會發出? find發生同樣的事情。

如果我將single更改爲filter,則什麼都不發出。

FYI:這是文件系統事件的重命名方案 - 如果add事件的unlink事件的1秒鐘之內,接着給發出文件哈希匹配,什麼也不做,因爲它是一個rename。否則,它是一個真正的unlink,它應該發出要刪除的數據庫文檔。

+0

這聽起來像你在這裏構建了一個非常討厭的競賽條件。超時通常不是解決這個問題的好方法 - 如果事情由於某種原因需要更長的時間,則會丟失數據。 –

+1

是的,這裏肯定有潛在的競爭條件。它最終可能會挫敗這種做法。但它似乎是學習rxjs的好機會。 – glortho

回答

3

這是我的猜測,你怎麼可以這樣做:

onUnlink$.concatMap(unlinkValue => { 
    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share(); 
    const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$); 
    const onAddBuffer$ = onAdd$.buffer(bufferDuration$); 

    return Observable.forkJoin(onAddBuffer$, doc$) 
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ }); 
}); 

single()運營商是有點棘手,因爲它發出的源可觀察完成僅後的謂語功能相匹配的項目(或發出有兩個項目或沒有匹配項目時出錯)。

race()也很棘手。如果其中一個源Observable完成並且沒有發出任何值race()將剛剛完成並且不會發出任何東西。我前一段時間曾經報道過,這是正確的行爲,請參閱https://github.com/ReactiveX/rxjs/issues/2641
我想這是你的代碼出了什麼問題。

另請注意,.mapTo(Rx.Observable.empty())會將每個值映射到Observable的實例。如果您想忽略所有值,則可以使用filter(() => false)ignoreElements()運算符。