目的:可觀察到流
我使用StackExchange Redis的客戶端。我的目標是從客戶端公開的Pub Sub Subscriber創建一個Observable流,然後可以依次通過Observables支持訂閱,其中每個訂閱者都有自己的通過LINQ過濾的訂閱。 (出版工作按計劃進行,這個問題純粹是圍繞訂閱事件流上的特定通道。)
背景:
我使用Redis的酒吧子作爲事件源CQRS應用程序的一部分。具體用例是將事件發佈給多個訂閱者,然後更新各種閱讀模型,發送電子郵件等。
這些訂戶都需要過濾它們處理的事件類型,爲此我期待使用Rx帶有LINQ的.Net(Reactive Extensions)到 在事件流上提供了一個過濾標準,以便有效地處理僅對感興趣的事件作出反應。使用這種方法可以省去註冊處理程序的事件總線實現,並且允許我通過部署1-n向系統添加新投影。每個微服務都有1-n訂閱了具有自己的事件流的可觀察對象具體過濾器。
我曾嘗試:
1)I已經創建了一個類從ObservableBase繼承,重寫SubscribeCore方法,該方法從觀測量接收的訂閱請求,將它們存儲在ConcurrentDictionary,並且作爲各Redis的通知到達從通道中,遍歷已註冊的Observable訂閱者,並調用傳遞RedisValue的OnNext方法。
2)我創建了一個Subject,它也接受來自Observables的訂閱,並調用它們的OnNext方法。再次,受試者的使用似乎被許多人所詬病。
問題:
我試圖做的功能(至少在表面上),與各種性能層面的方法,但feel like a hack
,那我不是它的目的的方式使用的Rx。
我看到很多意見,應儘可能使用內置的Observable方法,例如Observable.FromEvent,但似乎不可能使用StackExchange Redis客戶端訂閱API,至少在我眼中。
我也明白,用於接收流,並轉發到多個觀察員的優選的方法是使用一個ConnectableObservable,這似乎被設計爲非常場景我面對(每個微服務將在內部具有1 -n訂閱觀察對象)。目前,我無法理解如何將ConnectableObservable連接到來自StackExchange Redis的通知,或者如果它提供的實際好處超過了Observable。
UPDATE:
雖然完成是不是在我的情況的問題(處理是罰款),錯誤處理爲重要;例如隔離在一個用戶中檢測到的錯誤,以防止所有訂閱終止。
整潔,我喜歡的方法,+1。 (在接受答案之前,我傾向於等待幾天到幾天,以便讓其他人有機會。)如我的問題所述,您是否發現通過Observable使用ConnectableObservable有什麼優勢;大多數來源將ConnectableObservable描述爲針對讀取流的場景而設計,然後發佈到多個輔助觀察者。儘管在我的場景中完成並不是問題(處置很好),但錯誤處理很重要;隔離它們以防止所有訂閱終止。 – dmcquiggin
如果您想與多個觀察者共享相同的訂閱,那麼您將需要使用'IConnectableObservable',這可以通過使用'.Publish()'或調用另一個'.Multicast()'實現來完成。 'WhenMessageReceived'的結果。它不應該改變'WhenMessageReceived'是如何實現的。 – Lukazoid
通過'WhenMessageReceived'返回一個'IConnectableObservable',以及10個併發的訂閱到'IConnectableObservable',是的,測試過的,性能很好。好的解決方案 – dmcquiggin