我有一個傳統的基於事件的對象,它看起來像是一個完美的RX:在連接到網絡源後,它在收到消息時引發事件,並可能終止一個單一的錯誤(連接死亡等)或(很少)表示不會有更多消息。這個對象也有一些預測 - 大多數用戶只對收到的消息的一部分感興趣,所以只有當知名消息子類型出現時纔會引發備用事件。在IConnectableObservable中包裝傳統對象
所以,學習更多有關反應式編程的過程中,我建立了以下包裝:
class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage>
{
private LegacyType _Legacy;
private IConnectableObservable<TopLevelMessage> _Impl;
public LegacyReactiveWrapper(LegacyType t)
{
_Legacy = t;
var observable = Observable.Create<TopLevelMessage>((observer) =>
{
LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted();
_Legacy.TopLevelMessage += tlmHandler;
_Legacy.Error += errHandler;
_Legacy.Complete += doneHandler;
return Disposable.Create(() =>
{
_Legacy.TopLevelMessage -= tlmHandler;
_Legacy.Error -= errHandler;
_Legacy.Complete -= doneHandler;
});
});
_Impl = observable.Publish();
}
public IDisposable Subscribe(IObserver<TopLevelMessage> observer)
{
return _Impl.RefCount().Subscribe(observer);
}
public IDisposable Connect()
{
_Legacy.ConnectToMessageSource();
return Disposable.Create(() => _Legacy.DisconnectFromMessageSource());
}
public IObservable<SubMessageA> MessageA
{
get
{
// This is the moral equivalent of the projection behavior
// that already exists in the legacy type. We don't hook
// the LegacyType.MessageA event directly.
return _Impl.RefCount()
.Where((tlm) => tlm.MessageType == MessageType.MessageA)
.Select((tlm) => tlm.SubMessageA);
}
}
public IObservable<SubMessageB> MessageB
{
get
{
return _Impl.RefCount()
.Where((tlm) => tlm.MessageType == MessageType.MessageB)
.Select((tlm) => tlm.SubMessageB);
}
}
}
一些關於如何使用它在別處的感覺...關閉...但不知何故。以下是示例使用情況,它可以工作但感覺很奇怪。我的測試應用程序的UI上下文是WinForms,但它並不重要。
// in Program.Main...
MainForm frm = new MainForm();
// Updates the UI based on a stream of SubMessageA's
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm);
LegacyType lt = new LegacyType();
// ... setup lt...
var w = new LegacyReactiveWrapper(lt);
var uiUpdateSubscription = (from msgA in w.MessageA
where SomeCondition(msgA)
select msgA).ObserveOn(frm).Subscribe(uiManager);
var nonUiSubscription = (from msgB in w.MessageB
where msgB.SubType == MessageBType.SomeSubType
select msgB).Subscribe(
m => Console.WriteLine("Got MsgB: {0}", m),
ex => Console.WriteLine("MsgB error: {0}", ex.Message),
() => Console.WriteLine("MsgB complete")
);
IDisposable unsubscribeAtExit = null;
frm.Load += (sender, e) =>
{
var connectionSubscription = w.Connect();
unsubscribeAtExit = new CompositeDisposable(
uiUpdateSubscription,
nonUiSubscription,
connectionSubscription);
};
frm.FormClosing += (sender, e) =>
{
if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); }
};
Application.Run(frm);
這工作 - 的形式推出,用戶界面更新,當我關閉它的訂閱得到清理和退出進程(這要是LegacyType的網絡連接仍處於連接狀態也不會做)。嚴格來說,只需要處理connectionSubscription
即可。然而,明確的Connect
感覺怪異。由於RefCount
應該爲你做,我試圖修改包裝,這樣,而不是在MessageA
和MessageB
中使用_Impl.RefCount
,並明確地連接後,我用this.RefCount
,而不是Subscribe
調用Load
處理程序。這有一個不同的問題 - 第二次訂閱觸發LegacyReactiveWrapper.Connect
的另一個電話。我認爲Publish
/RefCount
背後的想法是「先入式觸發器連接,最後一次處置連接」。
我想我有三個問題:
- 難道我從根本上誤解了
Publish
/RefCount
? - 有沒有一些首選的方式來實現
IConnectableObservable<T>
,不涉及委託給通過IObservable<T>.Publish
獲得的一個?我知道你不應該自己實現IObservable<T>
,但我不明白如何將連接邏輯注入IConnectableObservable<T>
,Observable.Create().Publish()
爲您提供。Connect
應該是冪等的嗎? - 對RX/reactive programming更熟悉的人會看樣本,瞭解包裝是如何使用的,並說「這很醜並且破損」,或者這看起來不像奇怪嗎?
+1作爲實現細節'Publish()。RefCount()'。太多次我看到默認情況下不這樣做的事件包裝。 Rx正在爲您訂閱,所以請確保您不會在事件中添加多餘的事件處理程序,從而不會加倍處理它們! :) –