2013-06-23 12 views
3

我有一組類具有以下接口:通過幾個用戶「路由」事件的模式?

public interface RoutedEventReceiver<T> 
{ 
    IDisposable Apply(IObservable<T> stream); 
    bool ShouldForwardEvent(T anEvent); 
} 

我想什麼做的是保持這些類的堆棧,它們通過ShouldForwardEvent(T)謂詞被過濾每個事件,並將所得IObservable<T>通過到下一個接收器。我也希望能夠在程序運行時推送和彈出新的接收器(在某些時候,我可能希望從堆棧移動到其他集合,但現在堆棧已足夠)。

我目前的工作,但我不覺得它是非常「Rx」。我相信一定有辦法做到我想要的東西沒有這一切的當務之急邏輯:

private void Refresh() 
{ 
    // _subscriptions is a list of previous subscriptions 
    foreach (var subscription in _subscriptions) 
     subscription.Dispose(); 
    _subscriptions.Clear(); 

    // _stream is my stream of incoming events 
    if (_stream != null) 
    { 
     var stream = _stream; 

     foreach (var eventReceiver in _eventReceivers) 
     { 
      // add the subscription so it can be disposed next Refresh() 
      _subscriptions.Add(eventReceiver.Apply(stream)); 

      // filter the stream for the next event receiver 
      stream = stream.Where(eventReceiver.ShouldForwardEvent); 
     } 
    } 
} 

上述方法被稱爲每當我PushPop在堆棧中。

是否有更清晰,更實用的方式來表達上述意圖?我嘗試過.Publish(),但收效甚微 - 可能我不太清楚。

+0

我發現你的接口不一致:你正在用'IObservable '和'T'直接工作。有沒有像'void Process(T anEvent)'而不是'Apply()'這樣的東西更有意義? – svick

+0

不在我的情況。接收器必須能夠在事件流上使用Rx操作符。我可以改變'ShouldForwardEvent'方法來獲取並返回一個'IObservable '與隱式合約,即源流應該被返回 - 但我不喜歡隱式合約(並且在簡單的時候不需要額外的功能'Where where'就足夠了) – AlexFoxGill

回答

0

我已成功地使Publish方法的工作,但它不提供我要比擺脫需要其他保持的IDisposables列表:

private void Refresh() 
{ 
    _published.DisposeIfNotNull(); 

    if (_stream != null) 
    { 
     var connectable = _stream.Publish(); 
     _published = connectable.Connect(); 
     var stream = connectable.AsObservable(); 

     foreach (var eventReceiver in _eventReceivers) 
     { 
      eventReceiver.Apply(stream); 
      stream = stream.Where(eventReceiver.ShouldForwardEvent); 
     } 
    } 
} 
0

下面的類(命名爲CORStack Chain of Responsibility * Stack),試圖做你以後的事情。它在內部增加一個ShouldHandle布爾到流中,並用它來確定是否處理。它公開了標準PushPopPeek方法。

public sealed class CORStack<T> 
{ 
    Stack<StackFrame> _handlers; 

    public CORStack(IObservable<T> source) 
    { 
     _handlers = new Stack<StackFrame>(); 
     _handlers.Push(new StackFrame(
      source.Select(t => new ShouldHandleWrapper(t, true)), 
      new Handler<T>(new Action<T>(t => { }), true))); 
    } 

    public void Push(Handler<T> handler) 
    { 
     _handlers.Push(new StackFrame(_handlers.Peek().Observable, handler)); 
    } 

    public Handler<T> Peek() 
    { 
     return _handlers.Peek().Handler; 
    } 

    public Handler<T> Pop() 
    { 
     var frame = _handlers.Pop(); 
     frame.Dispose(); 
     return frame.Handler; 
    } 

    class StackFrame : IDisposable 
    { 
     IDisposable _unsub; 

     public IObservable<ShouldHandleWrapper> Observable { get; private set; } 
     public Handler<T> Handler { get; private set; } 

     public StackFrame(IObservable<ShouldHandleWrapper> topOfStack, Handler<T> handler) 
     { 
      _unsub = topOfStack.Subscribe(shouldHandle => 
       { 
        if (shouldHandle.ShouldHandle) 
         handler.Action.Invoke(shouldHandle.Value); 
       }); 
      Observable = topOfStack.Select(shouldHandle => 
       new ShouldHandleWrapper(shouldHandle.Value, shouldHandle.ShouldHandle && handler.Forward)); 
      Handler = handler; 
     } 

     public void Dispose() 
     { 
      _unsub.Dispose(); 
     } 
    } 

    class ShouldHandleWrapper 
    { 
     public readonly T Value; 
     public readonly bool ShouldHandle; 

     public ShouldHandleWrapper(T value, bool shouldHandle) 
     { 
      Value = value; 
      ShouldHandle = shouldHandle; 
     } 
    } 
} 

public class Handler<T> 
{ 
    public Action<T> Action { get; set; } 
    public bool Forward { get; set; } 

    public Handler(Action<T> action, bool forward) 
    { 
     Action = action; 
     Forward = forward; 
    } 
} 

*我意識到這不是一個責任鏈,但不能想到一個更好的名字atm。

+0

對不起,也許我應該澄清一下 - 堆棧的最新成員(或最上面的成員)應該是第一個接收流的成員。這就是爲什麼當前我需要'Refresh()'方法 - 因爲當添加一個新的事件接收器時,流向堆棧頂層成員的流必須改變 – AlexFoxGill

0

這是我實際使用Subjects的情況。爲每個處理程序創建一個主題,然後訂閱該流並循環處理根據需要傳遞事件的處理程序。這樣可以避免不斷地取消訂閱/重新訂閱流(因此也不適用於Refresh方法)。我們使用lock來防止新的接收器在新的值通過流時同時被添加或刪除。如果你能保證不會發生,那麼你可以刪除lock聲明。

public class YourClass<T> : IDisposable 
{ 
    private readonly Stack<Tuple<Subject<T>, RoutedEventReceiver<T>, IDisposable> _handlers; 
    private readonly IObservable<T> _stream; 
    private readonly IDisposable _streamSubscription; 

    public YourClass(IObservable<T> stream) 
    { 
     _handlers = new Stack<Tuple<Subject<T>, RoutedEventReceiver<T>, IDisposable>(); 
     _stream = stream; 
     _streamSubscription = stream.Subscribe(OnNext, OnError, OnCompleted); 
    } 

    public void Dispose() 
    { 
     _streamSubscription.Dispose(); 

     lock (_handlers) 
     { 
      foreach (var h in _handlers) 
      { 
       h.Item3.Dispose(); 
       h.Item1.Dispose(); 
      } 
      _handlers.Clear(); 
     } 
    } 

    private void OnNext(T value) 
    { 
     lock (_handlers) 
     { 
      for (var h in _handlers) 
      { 
       h.Item1.OnNext(value); 
       if (!h.Item2.ShouldForwardEvent(value)) break; 
      } 
     } 
    } 

    private void OnError(Exception e) 
    { 
     lock (_handlers) 
     { 
      for (var h in _handlers) { h.Item1.OnError(e); } 
     } 
    } 

    private void OnCompleted() 
    { 
     lock (_handlers) 
     { 
      for (var h in _handlers) { h.Item1.OnCompleted(); } 
     } 
    } 

    public void Push(RoutedEventReceiver<T> handler) 
    { 
     lock (_handlers) 
     { 
      var subject = new Subject<T>; 
      _handlers.Push(Tuple.Create(subject, handler, handler.Apply(subject))); 
     } 
    } 

    public RoutedEventReceiver<T> Pop() 
    { 
     lock (_handlers) 
     { 
      var handler = _handlers.Pop(); 
      handler.Item3.Dispose(); 
      handler.Item1.Dispose(); 
      return handler.Item2; 
     } 
    } 
}