2016-11-24 240 views
5

目的:可觀察到流

我使用StackExchange Redis的客戶端。我的目標是從客戶端公開的Pub Sub Subscriber創建一個Observable流,然後可以依次通過Observables支持訂閱,其中每個訂閱者都有自己的通過LINQ過濾的訂閱。 (出版工作按計劃進行,這個問題純粹是圍繞訂閱事件流上的特定通道。)

背景:

我使用Redis的酒吧子作爲事件源CQRS應用程序的一部分。具體用例是將事件發佈給多個訂閱者,然後更新各種閱讀模型,發送電子郵件等。

這些訂戶都需要過濾它們處理的事件類型,爲此我期待使用Rx帶有LINQ的.Net(Reactive Extensions)到 在事件流上提供了一個過濾標準,以便有效地處理僅對感興趣的事件作出反應。使用這種方法可以省去註冊處理程序的事件總線實現,並且允許我通過部署1-n向系統添加新投影。每個微服務都有1-n訂閱了具有自己的事件流的可觀察對象具體過濾器。

我曾嘗試:

1)I已經創建了一個類從ObservableBase繼承,重寫SubscribeCore方法,該方法從觀測量接收的訂閱請求,將它們存儲在ConcurrentDictionary,並且作爲各Redis的通知到達從通道中,遍歷已註冊的Observable訂閱者,並調用傳遞RedisValue的OnNext方法。

2)我創建了一個Subject,它也接受來自Observables的訂閱,並調用它們的OnNext方法。再次,受試者的使用似乎被許多人所詬病。

問題:

我試圖做的功能(至少在表面上),與各種性能層面的方法,但feel like a hack,那我不是它的目的的方式使用的Rx。

我看到很多意見,應儘可能使用內置的Observable方法,例如Observable.FromEvent,但似乎不可能使用StackExchange Redis客戶端訂閱API,至少在我眼中。

我也明白,用於接收流,並轉發到多個觀察員的優選的方法是使用一個ConnectableObservable,這似乎被設計爲非常場景我面對(每個微服務將在內部具有1 -n訂閱觀察對象)。目前,我無法理解如何將ConnectableObservable連接到來自StackExchange Redis的通知,或者如果它提供的實際好處超過了Observable

UPDATE

雖然完成是不是在我的情況的問題(處理是罰款),錯誤處理重要;例如隔離在一個用戶中檢測到的錯誤,以防止所有訂閱終止。

回答

8

這裏是你可以用它來創建一個ISubscriberIObservable<RedisValue>擴展方法和RedisChannel

public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel) 
{ 
    return Observable.Create<RedisValue>(async (obs, ct) => 
    { 
     await subscriber.SubscribeAsync(channel, (_, message) => 
     { 
      obs.OnNext(message); 
     }).ConfigureAwait(false); 

     return Disposable.Create(() => subscriber.Unsubscribe(channel)); 
    }); 
} 

由於沒有Redis的渠道完成後所產生的IObservable將永遠不會完成,但您可能降IDisposable訂閱取消訂閱Redis頻道(這將由許多Rx運營商自動完成)。

用途可能是像這樣:

var subscriber = connectionMultiplexer.GetSubscriber(); 

var gotMessage = await subscriber.WhenMessageReceived("my_channel") 
    .AnyAsync(msg => msg == "expected_message") 
    .ToTask() 
    .ConfigureAwait(false); 

或按你的例子​​:

var subscriber = connectionMultiplexer.GetSubscriber(); 

var sendEmailEvents = subscriber.WhenMessageReceived("my_channel") 
    .Select(msg => ParseEventFromMessage(msg)) 
    .Where(evt => evt.Type == EventType.SendEmails); 

await sendEmailEvents.ForEachAsync(evt => 
{ 
    SendEmails(evt); 
}).ConfigureAwait(false); 

其他微服務可以過濾不同。

+0

整潔,我喜歡的方法,+1。 (在接受答案之前,我傾向於等待幾天到幾天,以便讓其他人有機會。)如我的問題所述,您是否發現通過Observable使用ConnectableObservable有什麼優勢;大多數來源將ConnectableObservable描述爲針對讀取流的場景而設計,然後發佈到多個輔助觀察者。儘管在我的場景中完成並不是問題(處置很好),但錯誤處理很重要;隔離它們以防止所有訂閱終止。 – dmcquiggin

+0

如果您想與多個觀察者共享相同的訂閱,那麼您將需要使用'IConnectableObservable',這可以通過使用'.Publish()'或調用另一個'.Multicast()'實現來完成。 'WhenMessageReceived'的結果。它不應該改變'WhenMessageReceived'是如何實現的。 – Lukazoid

+0

通過'WhenMessageReceived'返回一個'IConnectableObservable',以及10個併發的訂閱到'IConnectableObservable',是的,測試過的,性能很好。好的解決方案 – dmcquiggin