2013-11-22 47 views
1

我有一個傳統的基於事件的對象,它看起來像是一個完美的RX:在連接到網絡源後,它在收到消息時引發事件,並可能終止一個單一的錯誤(連接死亡等)或(很少)表示不會有更多消息。這個對象也有一些預測 - 大多數用戶只對收到的消息的一部分感興趣,所以只有當知名消息子類型出現時纔會引發備用事件。在IConnectableObservable中包裝傳統對象

所以,學習更多有關反應式編程的過程中,我建立了以下包裝:

class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage> 
{ 
    private LegacyType _Legacy; 
    private IConnectableObservable<TopLevelMessage> _Impl; 
    public LegacyReactiveWrapper(LegacyType t) 
    { 
     _Legacy = t; 
     var observable = Observable.Create<TopLevelMessage>((observer) => 
     { 
      LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm); 
      LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message)); 
      LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted(); 

      _Legacy.TopLevelMessage += tlmHandler; 
      _Legacy.Error += errHandler; 
      _Legacy.Complete += doneHandler; 

      return Disposable.Create(() => 
      { 
       _Legacy.TopLevelMessage -= tlmHandler; 
       _Legacy.Error -= errHandler; 
       _Legacy.Complete -= doneHandler; 
      }); 
     }); 

     _Impl = observable.Publish(); 
    } 

    public IDisposable Subscribe(IObserver<TopLevelMessage> observer) 
    { 
     return _Impl.RefCount().Subscribe(observer); 
    } 

    public IDisposable Connect() 
    { 
     _Legacy.ConnectToMessageSource(); 
     return Disposable.Create(() => _Legacy.DisconnectFromMessageSource()); 
    } 

    public IObservable<SubMessageA> MessageA 
    { 
     get 
     { 
      // This is the moral equivalent of the projection behavior 
      // that already exists in the legacy type. We don't hook 
      // the LegacyType.MessageA event directly. 
      return _Impl.RefCount() 
        .Where((tlm) => tlm.MessageType == MessageType.MessageA) 
        .Select((tlm) => tlm.SubMessageA); 
     } 
    } 

    public IObservable<SubMessageB> MessageB 
    { 
     get 
     { 
      return _Impl.RefCount() 
        .Where((tlm) => tlm.MessageType == MessageType.MessageB) 
        .Select((tlm) => tlm.SubMessageB); 
     } 
    } 
} 

一些關於如何使用它在別處的感覺...關閉...但不知何故。以下是示例使用情況,它可以工作但感覺很奇怪。我的測試應用程序的UI上下文是WinForms,但它並不重要。

// in Program.Main... 

MainForm frm = new MainForm(); 

// Updates the UI based on a stream of SubMessageA's 
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm); 

LegacyType lt = new LegacyType(); 
// ... setup lt... 

var w = new LegacyReactiveWrapper(lt); 

var uiUpdateSubscription = (from msgA in w.MessageA 
          where SomeCondition(msgA) 
          select msgA).ObserveOn(frm).Subscribe(uiManager); 

var nonUiSubscription = (from msgB in w.MessageB 
         where msgB.SubType == MessageBType.SomeSubType 
         select msgB).Subscribe(
          m => Console.WriteLine("Got MsgB: {0}", m), 
          ex => Console.WriteLine("MsgB error: {0}", ex.Message), 
          () => Console.WriteLine("MsgB complete") 
         ); 

IDisposable unsubscribeAtExit = null; 
frm.Load += (sender, e) => 
{ 
    var connectionSubscription = w.Connect(); 
    unsubscribeAtExit = new CompositeDisposable(
           uiUpdateSubscription, 
           nonUiSubscription, 
           connectionSubscription); 
}; 

frm.FormClosing += (sender, e) => 
{ 
    if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); } 
}; 


Application.Run(frm); 

這工作 - 的形式推出,用戶界面更新,當我關閉它的訂閱得到清理和退出進程(這要是LegacyType的網絡連接仍處於連接狀態也不會做)。嚴格來說,只需要處理connectionSubscription即可。然而,明確的Connect感覺怪異。由於RefCount應該爲你做,我試圖修改包裝,這樣,而不是在MessageAMessageB中使用_Impl.RefCount,並明確地連接後,我用this.RefCount,而不是Subscribe調用Load處理程序。這有一個不同的問題 - 第二次訂閱觸發LegacyReactiveWrapper.Connect的另一個電話。我認爲Publish/RefCount背後的想法是「先入式觸發器連接,最後一次處置連接」。

我想我有三個問題:

  1. 難道我從根本上誤解了Publish/RefCount
  2. 有沒有一些首選的方式來實現IConnectableObservable<T>,不涉及委託給通過IObservable<T>.Publish獲得的一個?我知道你不應該自己實現IObservable<T>,但我不明白如何將連接邏輯注入IConnectableObservable<T>Observable.Create().Publish()爲您提供。 Connect應該是冪等的嗎?
  3. 對RX/reactive programming更熟悉的人會看樣本,瞭解包裝是如何使用的,並說「這很醜並且破損」,或者這看起來不像奇怪嗎?

回答

2

我不確定您需要直接暴露連接,因爲您有。我會簡化如下,使用Publish().RefCount()作爲封裝的實現細節;它會導致傳統連接只能根據需要進行。這裏第一個用戶連接,最後一個連接斷開。還要注意,這在所有用戶中正確共享一個RefCount,而您的實現使用每個消息類型RefCount,這可能不是預期的。用戶無需顯式連接:

public class LegacyReactiveWrapper 
{ 
    private IObservable<TopLevelMessage> _legacyRx; 

    public LegacyReactiveWrapper(LegacyType legacy) 
    { 
     _legacyRx = WrapLegacy(legacy).Publish().RefCount(); 
    } 

    private static IObservable<TopLevelMessage> WrapLegacy(LegacyType legacy) 
    { 
     return Observable.Create<TopLevelMessage>(observer => 
     { 
      LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm); 
      LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message)); 
      LegacyCompleteHandler doneHandler = sender => observer.OnCompleted(); 

      legacy.TopLevelMessage += tlmHandler; 
      legacy.Error += errHandler; 
      legacy.Complete += doneHandler; 
      legacy.ConnectToMessageSource(); 

      return Disposable.Create(() => 
      { 
       legacy.DisconnectFromMessageSource(); 
       legacy.TopLevelMessage -= tlmHandler; 
       legacy.Error -= errHandler; 
       legacy.Complete -= doneHandler; 
      }); 
     }); 
    } 

    public IObservable<TopLevelMessage> TopLevelMessage 
    { 
     get 
     { 
      return _legacyRx; 
     } 
    } 

    public IObservable<SubMessageA> MessageA 
    { 
     get 
     { 
      return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageA) 
          .Select(tlm => tlm.SubMessageA); 
     } 
    } 

    public IObservable<SubMessageB> MessageB 
    { 
     get 
     { 
      return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageB) 
          .Select(tlm => tlm.SubMessageB); 
     } 
    } 
} 

另外一個發現是,當它的用戶數量Publish().RefCount()將下降底層認購達到0。通常我只用Connect過這個選擇,當我需要甚至維持訂閱時發佈源上的用戶數減少到零(並且可能稍後再次恢復)。很少需要這樣做 - 只有當連接比在需要的時候持有訂閱資源更昂貴的時候。

+0

+1作爲實現細節'Publish()。RefCount()'。太多次我看到默認情況下不這樣做的事件包裝。 Rx正在爲您訂閱,所以請確保您不會在事件中添加多餘的事件處理程序,從而不會加倍處理它們! :) –

1
  1. 你的理解並不完全錯,但你似乎有誤解的一些點。

    您似乎認爲在同一源IObservable上多次調用RefCount將導致共享引用計數。他們不;每個實例保持自己的計數。因此,您正在對_Impl進行多次訂閱,每次調用一次訂閱或調用消息屬性。

    您也可期待的是讓_Impl一個IConnectableObservable某種原因導致稱爲你Connect方法(因爲你似乎感到驚訝,你需要調用Connect在你的消費代碼)。所有Publish確實會導致發佈對象的訂閱者(從。發佈()調用)共享單個訂閱到基礎源observable(在這種情況下,您調用Observable.Create創建的對象)。

    通常,我會立即看到Publish和RefCount一起使用(例如source.Publish().RefCount())以獲得上述共享訂閱效果,或者無需調用Connect即可啓動對原始源的訂閱,從而可以冷觀察到熱度。但是,這依賴於爲所有訂戶使用從.Publish()。RefCount()返回的相同對象(如上所述)。

  2. 您的Connect實施看起來很合理。我不知道如果連接應該是冪等的,但我不會親自期待它。如果你希望這樣做,你只需要跟蹤調用它的回報價值,以獲得合適的平衡。

    我不認爲你需要使用發佈你的方式,除非有一些理由,以避免多個事件處理程序被附加到遺留對象。如果您確實需要避免這種情況發生,我會建議將_Impl更改爲普通IObservable,並按PublishRefCount

  3. 您的MessageAMessageB屬性有可能成爲用戶混淆的來源,因爲它們返回一個IObservable,但仍需要調用基礎對象上的Connect來開始接收消息。我會將它們改爲IConnectableObservables,它以某種方式委託給原始Connect(此時冪等性討論變得更相關)或者放棄屬性,並讓用戶在需要時自己製作(相當簡單的)投影。

+0

核心問題是我誤解了RefCount的工作原理。冪等性的問題與一個可能需要的解決方法有關,如果它工作了我認爲它的工作原理,而不是它的實際工作方式。你有一個有效的點3);我將它們包括在內,以瞭解與Publish()共享的連接是如何工作的,並且因爲我們有許多關心「僅A」或「僅B」的代碼,儘管它們出現在同一個流中。 – twon33