2012-02-12 90 views
1

我越來越熟悉RX和我的實驗項目中,我試圖創造概念上類似於一個簡單的命令總線命令總線:撰寫使用RX

class Bus 
{ 
    Subject<Command> commands; 
    IObservable<Invocation> invocations; 

    public Bus() 
    { 
     this.commands = new Subject<Command>(); 
     this.invocations = commands.Select(x => new Invocation { Command = x }).Publish(); 
    } 

    public IObserver<Command> Commands 
    { 
     get { return this.commands; } 
    } 

    public IObservable<Invocation> Invocations 
    { 
     get { return this.invocations; } 
    } 
} 

class Invocation 
{ 
    public Command Command { get; set; } 
    public bool Handled { get; set; } 
} 

的想法是,模塊可以安裝命令處理程序在啓動時使用Invocations屬性,並可以應用他們希望對其訂閱進行的任何過濾。另一方面,客戶可以通過調用Commands.OnNext(command)來觸發命令執行。

但是,我想總線提供一個保證,每個命令提交將由一個處理程序處理。也就是說,OnNext處理理想情況下,只要第一個處理程序將Invocation.Handled設置爲true,就立即終止處理,如果在OnNext()Invocation.Handled結束時仍然爲假,則應拋出異常。

我打得圍繞創建我自己的ISubject,和的IObservable實現IObserver但這種感覺「骯髒和廉價的」;)

我努力讓我的周圍將RX提供的成分動力頭。在構圖方面,我怎樣才能提供「一次確保」的保證?

感謝您提供的任何見解。

+0

這裏不是[責任鏈模式](http://en.wikipedia.org/wiki/Chain-of-responsibility_pattern)更適合嗎? – dtb 2012-02-12 06:24:55

+0

謝謝@dtb。一旦處理程序被選定,責任鏈可能對管理處理很有用。構建命令總線的方法比這個更簡單和有效。但是,我的問題更多的是關於RX和可組合性的概念。 – 2012-02-12 07:32:16

+0

你見過TPL數據流嗎? – Gabe 2012-02-12 13:47:15

回答

5

實際上,您通常在這裏得到了正確的想法。你只需要做實際的調度。爲此,將的SelectMany幫助:

class Bus 
{ 
    Subject<Command> commands; 
    Subject<Invocation> invocations; 

    // TODO: Instantiate me 
    List<Func<Command, bool>> handlerList; 

    public Bus() 
    { 
     this.commands = new Subject<Command>(); 
     this.invocations = new Subject<Invocation>(); 

     commands.SelectMany(x => { 
      // This FirstOrDefault() is just good ol' LINQ 
      var passedHandler = 
       handlerList.FirstOrDefault(handler => handler(x) == true); 

      return passedHandler != null ? 
       Observable.Return(new Invocation() { Command = x, Handled = true}) : 
       Observable.Throw<Invocation>(new Exception("Unhandled!")); 
     }).Multicast(invocations).Connect(); 
    } 

    /* ... snip ... */ 
} 

不過,說實話,這並不能真正證明Rx的力量,因爲它是同步執行處理程序列表。讓我們通過使這個完全無阻塞來讓這更加引人注目。

首先,我們將把我們的Func原型改爲Func<Command, IObservable<Invocation>>。這意味着一種接受命令併產生Future Invocation結果的方法(a-la Task<T>)。然後,我們可以得到相同的行爲尚未有我們的處理是異步通過這個選擇(通過文本區域編碼前):

commands.SelectMany(x => 
    handlerList.ToObservable() 
     .Select(h => Observable.Defer(() => h(x))) 
     .Concat() 
     .SkipWhile(x => x.Handled == false) 
     .TakeLast(1)) 
    .Multicast(invocations).Connect(); 

這是一個漂亮的研究生水平的使用Rx的,但這個想法是,對於每一個命令,我們將首先創建一個Stream處理程序並按順序運行它們(這就是Defer + Concat所做的),直到找到一個處理完畢的處理程序,然後取得最後一個處理程序。

SelectMany外部選擇命令流成爲未來結果流(即,類型爲IO<IO<Invocation>>然後將其平滑,因此它變成了結果流。

沒有任何阻止,非常簡潔,100%可測試,類型安全的代碼,只是表達了一個非常複雜的想法,如果命令式寫入會非常難看。這就是爲什麼Rx很酷。

+0

謝謝保羅。這需要一點時間才能讓我的頭部得到正確的處理,但是我正在展示我希望看到的構圖能力。將密切研究它。 我注意到@Gideon關於通知所有觀察者的觀點也已經形成了你的解決方案。基本上,IO訂閱不能成爲處理者使用總線自行提供的機制。 – 2012-02-13 22:40:48

+0

.TakeWhile(x => x.Handled == false) .TakeLast(1)我沒有嘗試這個,但是不會得到最後一個未處理的Observable,而不是第一次處理?我會做.SkipWhile(x => x.Handled == false)。取(1)。這是真的,還是我認爲它錯了? – nemenos 2012-02-14 09:34:30

+0

我認爲你實際上是對的 – 2012-02-14 21:35:00

0

雖然也許你可以做一個「正好一次」的科目,你不應該。接口(以及庫中的所有操作符)意味着將通知所有觀察者(忽略OnNext調用中可能出現異常的情況)。

你可以做的是創建一個備用組定義所需的語義接口:

interface IHandlableObservable<T> 
{ 
    //gets first chance at the notification 
    IDisposable SubscribeFirst(IHandlingObserver<T> observer); 
    //gets last chance at the notification 
    IDisposable SubscribeLast(IHandlingObserver<T> observer); 
    //starts the notification (possibly subscribing to an underlying IObservable) 
    IDisposable Connect(); 
} 

interface IHandlingObserver<T> 
{ 
    //return indicates if the observer "handled" the value 
    bool OnNext(T value); 
    void OnError(Exception ex); 
    void OnCompleted(); 
} 

然後,您可以定義允許您將一般可觀察到handlable觀測方法,這樣就可以保留大部分的標準RX運營商的邏輯。

+0

謝謝@Gideon。是的,我想知道這是不是一個非常適合用RX解決的問題。但我不確定我是否還沒有找到合成解決方案所需的思維轉換。 – 2012-02-12 21:44:30