2012-06-25 22 views
9

我最近一直在閱讀關於IObservable。到目前爲止,我已經看過各種SO問題,並觀看了他們可以做什麼的視頻。我所想的整個「推動」機制非常出色,但我仍然試圖弄清楚究竟是什麼。從我的閱讀材料來看,我認爲IObservable是一種可以「觀看」的方式,而IObservers是「觀察者」。使用IObservable而不是事件

所以現在我要嘗試在我的應用程序中實現這個功能。在我開始之前,有幾件事情我想要堅持下去。我已經看到,IObservable與IEnumerable是相反的,但是,我無法真正看到我的特定實例中可以併入我的應用程序中的任何位置。

目前,我大量使用活動,以至於我看到「管道工程」開始變得難以管理。我會認爲,IObservable可以幫助我在這裏。

考慮下面的設計,這是我的包裝在我的I/O我的應用程序(僅供參考,我通常要處理的字符串)內:

我有一個基本接口被稱爲IDataIO

public interface IDataIO 
{ 
    event OnDataReceived; 
    event OnTimeout: 
    event OnTransmit; 
} 

現在,我現在有實現此接口的三個班,每個類以某種方式都利用異步方法調用,將某些類型的多線程處理:

public class SerialIO : IDataIO; 
public class UdpIO : IDataIO; 
public class TcpIO : IDataIO; 

有每個類包裹到我的最後一類,稱爲IO的的單個實例(其也實現IDataIO - 粘附到我的策略模式):

public class IO : IDataIO 
{ 
    public SerialIO Serial; 
    public UdpIO Udp; 
    public TcpIO Tcp; 
} 

我已經利用策略模式來封裝這些三個類,以便在運行時在不同的IDataIO實例之間進行更改時,使其對最終用戶「不可見」。正如你所想象的那樣,這導致了背景中相當多的「事件管理」。

那麼,在我的情況下,我該如何利用'推'通知?我不想訂閱事件(DataReceived等),而只想將數據推送給任何感興趣的人。我有點不確定從哪裏開始。我仍然試圖玩弄Subject的想法/通用類,以及這個(ReplaySubject/AsynSubject/BehaviourSubject)的各種化身。有人可以請這個啓發我(也許參照我的設計)?或者這不是一個理想的適合IObservable

PS。隨時糾正任何我的「誤解」 :)

回答

8

觀測量是偉大的代表數據流的,所以你的DataReceived活動將很好地模式,以可觀察到的圖案,像IObservable<byte>IObservable<byte[]>東西。您還可以得到便利的OnErrorOnComplete的附加優勢。

就實施而言,很難說您的具體情況,但我們經常使用Subject<T>作爲基礎來源,並呼籲OnNext推送數據。也許像

// Using a subject is probably the easiest way to push data to an Observable 
// It wraps up both IObservable and IObserver so you almost never use IObserver directly 
private readonly Subject<byte> subject = new Subject<byte>(); 

private void OnPort_DataReceived(object sender, EventArgs e) 
{ 
    // This pushes the data to the IObserver, which is probably just a wrapper 
    // around your subscribe delegate is you're using the Rx extensions 
    this.subject.OnNext(port.Data); // pseudo code 
} 

然後,您可以暴露通過屬性的主題:

public IObservable<byte> DataObservable 
{ 
    get { return this.subject; } // Or this.subject.AsObservable(); 
} 

您可以用IObservable<T>IDataIO更換你DataReceived活動,讓每個戰略類處理自己的數據爲準的方式,他們需要並推送到Subject<T>

在另一邊,誰訂閱了可觀察到,然後能夠自行處理它像一個事件(只需使用一個Action<byte[]>),也可以用SelectWhereBuffer等在流上進行一些真正有用的工作。

private IDataIO dataIo = new ... 

private void SubscribeToData() 
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes); 
} 

private void On16Bytes(IList<byte> bytes) 
{ 
    // do stuff 
} 

ReplaySubject/ConnectableObservable s爲偉大的,當你知道你的用戶將被遲到的黨,但仍然需要追趕上的所有事件。源緩存所有推送的內容,併爲每個訂閱者重放所有內容。只有你可以說這是否是你實際需要的行爲(但要小心,因爲它會緩存一切會顯着增加你的內存使用量)。

當我學習的Rx我發現的Rx http://leecampbell.blogspot.co.uk/博客系列是非常豐富的,瞭解理論(職位是現在有點過時,API已經更改,因此注意這一點)

+0

嗨RichK,交易能否請您闡述一下主題屬性?這是如何宣佈的?而這個類的用戶,他們到底會'訂閱'IObservable'DataReceived'。 – Simon

+0

@Simon我做了一些編輯,讓我知道如果你仍然不確定:) – RichK

+0

謝謝,這清除了一些東西了。只是1件事,我假設'dataIo.DataObservable'是'公共IObservable DataObservable''? – Simon

4

這是絕對是觀察者的理想情況。類可能會看到最大的改進。首先,讓我們改變接口來使用observables,看看這個組合類變得多麼簡單。

public interface IDataIO 
{ 
    //you will have to fill in the types here. Either the event args 
    //the events provide now or byte[] or something relevant would be good. 
    IObservable<???> DataReceived; 
    IObservable<???> Timeout; 
    IObservable<???> Transmit; 
} 

public class IO : IDataIO 
{ 
    public SerialIO Serial; 
    public UdpIO Udp; 
    public TcpIO Tcp; 

    public IObservable<???> DataReceived 
    { 
     get 
     { 
      return Observable.Merge(Serial.DataReceived, 
            Udp.DataReceived, 
            Tcp.DataReceived); 
     } 
    } 

    //similarly for other two observables 
} 

邊注:您可能注意到,我改變了接口成員的名字。在.NET中,事件通常被命名爲<event name>,並且引發它們的函數被稱爲On<event name>

對於生產類,您有幾個選擇取決於實際的來源。假設您在SerialIO中使用.NET SerialPort類,並且DataReceived返回IObservable<byte[]>。由於SerialPort已經有一個接收數據的事件,所以你可以直接使用它來創建你需要的可見性。

public class SerialIO : IDataIO 
{ 
    private SerialPort _port; 

    public IObservable<byte[]> DataRecived 
    { 
     get 
     { 
      return Observable.FromEventPattern<SerialDataReceivedEventHandler, 
               SerialDataReceivedEventArgs>(
         h => _port.DataReceived += h, 
         h => _port.DataReceived -= h) 
        .Where(ep => ep.EventArgs.EventType == SerialData.Chars) 
        .Select(ep => 
          { 
           byte[] buffer = new byte[_port.BytesToRead]; 
           _port.Read(buffer, 0, buffer.Length); 
           return buffer; 
          }); 
     } 
    } 
} 

對於您沒有現有事件源的情況,您可能需要使用RichK建議的主題。他的回答涵蓋了這種使用模式,所以我不會在這裏重複。

你沒有說明你如何使用這個接口,但根據用例,這些類上的其他函數自己返回IObservable可能更有意義,並完全消除這些「事件」。使用基於事件的異步模式,您必須將事件與您調用以觸發工作的函數分開,但使用observables時,您可以從函數返回它們,以使其更加明顯地表示您正在訂閱的內容。該方法還允許從每個呼叫返回的可觀測量發送OnErrorOnCompleted消息來表示操作結束。根據你使用的組合課程,我不認爲這在這個特定情況下是有用的,但是應該記住這一點。

+0

+1謝謝,這裏有一些很好的信息。就在'Merge()'語句中 - 這將一系列觀察值合併爲1 - 在我的應用程序中,我只會一次使用其中一個(serial/udp/tcp),並允許用戶在不同的接口(因此我的困境與事件管道)。這裏推薦合併觀察對象嗎?欣賞指向異步串行事件的鏈接:) – Simon

+0

@Simon合併它們時,可以將「Where」應用於包裝類中的觀察對象,該對象檢查「CurrentSource」或類似屬性以過濾掉不需要的消息。 –

+1

@Simon或者你可以(最好)停止在其他可觀察對象上生成消息,這樣'Merge'一次只能接收一個消息。如果是這樣的話,最好使用'Switch'而不是'Merge'。 – yamen

0

一般的答案:事件反應事件:

因爲我iobservables一般用屬性更改啓動:

如果事件是財產上的變化可以使用:

IObservable<string> obs= user 
     .ItemPropertyChanged(u=>u.Name,false) 

通過參考此代碼項目文章中的dll: https://www.codeproject.com/Articles/731032/The-best-of-reactive-framework-

(據說作者還掙扎着看到反應擴展的實用程序)

訂閱這個「爲propertyChange」事件:

obs.Subscribe((args) => 
     { 
      //Do stuff with args.Sender,args.NewValue and args.OldValue 
     }); 

,或者只在屬性變化感興趣的是有NuGet包用類似的方法RXX,使用像這樣:

IObservable<string> obs=Observable2.FromPropertyChangedPattern(() => obj.Name) 

這包含的其他擴展方法

很大10

,或者當事件排除屬性更改/你不想從http://rxwiki.wikidot.com/101samples#toc6


執行INotifyPropertyChanged

class ObserveEvent_Simple 
{ 
    public static event EventHandler SimpleEvent; 
    static void Main() 
    {   
     IObservable<string> eventAsObservable = Observable.FromEventPattern(
      ev => SimpleEvent += ev, 
      ev => SimpleEvent -= ev); 
    } 
} 

類似於U /吉迪恩Engelberth 或使用該致力於事件轉化爲反應事件的代碼項目文章

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

這也與弱訂閱(有什麼取決於你從OnSubscribe方法做什麼用了IDisposable可能/可能不會是一個問題)

相關問題