我想你誤解了Rx的一些問題。可觀察對象沒有過濾器,而且您不能「過濾」添加和移除過濾器。基於訂閱者的觀察數據也不可觀察。
而是建立一個呼叫鏈。您從源觀察開始,如addRecord
中的一個和removeRecord
事件中的一個。然後,您將這些可觀測量鏈接到Rx中的各種operators形成新的觀測值,最終您訂閱最終可觀測值。訂閱將激活整個鏈條,並且當源事件觸發時,所有操作員將觸發並最終該事件(如果未被過濾)達到subscribe
。
你實際上可以做你在Rx中描述的東西。使用switchMap
這個可以讓你將一個序列投影到另一個序列並每次切換到新序列的操作符都可以相對簡單地更改可觀察對象上的過濾器。例如filterSource.switchMap(filterFunction => Obs-1.filter(filterFunction))
。即使比這更簡單,您可以退訂第一次訂閱並重新設置Rx鏈。然而,使用構建函數會使等式中出現很多雜耍狀態。
但是,我強烈懷疑你並不需要這種複雜的行爲。你想要的東西可以像這樣存檔:
var Src-1 = fromEvent(dataSource, 'addRecord') // create the first source
var Src-2 = fromEvent(dataSource, 'removeRecord') // and the other source
var Obs-1 = Src-1.combineLatest(Src-2) // combine both sources
.filter(e => someCondition(e)) // filter the source
var Obs-2 = Obs-1.mergeMap(e => someOtherCondition(e) ? Change(e) : Rx.Observable.of(e)) // on someOtherCondition, either transform the source with the `Change(e)` function. Or keep it unchanged with `of(e)`
var Obs-3 = Obs-2.filter(e => anotherCondition(e)) // Filter again
var sub = Obs-3.subscribe() // activate the sequence.
感謝您糾正我。每個觀察者(OBS-1,OBS-2等)都可以有自己的訂閱,它將接收數據。實際上觀察將被動態添加。我們如何做到這一點? – user3130446
你如何動態接收observables?你不是指訂閱嗎?我認爲你需要更具體地瞭解你想要的以及你的來源是什麼導致了這些變化。 Rx是關於反應式程序設計的,在反應式程序設計中,瞭解數據來自哪裏,而不是如何操作它很重要。一旦你的信息來源清晰,其餘的就會落實到位,現在你似乎在尋找錯誤的解決方案。 – Dorus