2013-04-15 58 views
1

我開始與反應框架。基於例如:捕獲異常(乒乓反應樣品)

http://rxwiki.wikidot.com/101samples(乒乓)

我做我的應用程序有着相似的應用。我能不知道如何將一個動作傳遞給OnError(正在訪問課外,正在實施的對象)

我該怎麼辦?

public class PFocusPing : ISubject<PFocusPong, PFocusPing> 
{ 
    private readonly AdvancedConnection device; 
    public PFocusPing(AdvancedConnection con) 
    { 
     this.device = con; 
    } 

    #region Implementation of IObserver<PFocusPong> 
    public void OnNext(PFocusPong value) 
    { 
     this.device.SendData(clsPowerFocus.DataKeepAlive); 
     Console.WriteLine("PFocusPing received PFocusPong."); 
    } 

    public void OnError(Exception exception) 
    { 
     Console.WriteLine("PFocusPing experienced an exception and had to quit playing."); 
    } 

    public void OnCompleted() 
    { 
     Console.WriteLine("PFocusPing finished."); 
    } 
    #endregion 

    #region Implementation of IObservable<PFocusPing> 
    public IDisposable Subscribe(IObserver<PFocusPing> observer) 
    { 
     return Observable.FromEventPattern<Args<DataReceive>>(add => device.DataReceived += add, 
                rem => device.DataReceived -= rem) 
                .Where(obj => obj.EventArgs.Value.Result == DataReceive.CONNECTION_RESULT.SUCESS) 
                .Where(obj => obj.EventArgs != null && 
                    obj.EventArgs.Value != null && 
                    obj.EventArgs.Value.Data != null && 
                    obj.EventArgs.Value.Data.Length > 0) 
                .Select(obj => this) 
                .Subscribe(observer); 

    } 
    #endregion 

    #region Implementation of IDisposable 
    public void Dispose() 
    { 
     OnCompleted(); 
    } 
    #endregion 
} 

public class PFocusPong : ISubject<PFocusPing, PFocusPong> 
{ 
    private readonly int timeout; 
    private readonly int reply; 
    private DateTime lastAnswer; 

    public PFocusPong(int msPing, int msTimeout) 
    { 
     this.reply = Math.Max(1000, msPing); 
     this.timeout = Math.Max(2000, msTimeout); 
    } 

    #region Implementation of IObserver<Ping> 

    public void OnNext(PFocusPing value) 
    { 
     this.lastAnswer = DateTime.Now; 
     Console.WriteLine("PFocusPong received Ping."); 
    } 

    public void OnError(Exception exception) 
    { 
     Console.WriteLine("PFocusPong experienced an exception and had to quit playing."); 
    } 

    public void OnCompleted() 
    { 
     Console.WriteLine("PFocusPong finished."); 
    } 

    #endregion 

    #region Implementation of IObservable<PFocusPong> 

    public IDisposable Subscribe(IObserver<PFocusPong> observer) 
    { 
     var keepAlive = 
      Observable.Timer(TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(reply)) 
      .Select(obj => 
       { 
        if ((DateTime.Now - this.lastAnswer).TotalMilliseconds < reply) 
        { 
         Console.WriteLine("True"); 
         return true; 
        } 

        Console.WriteLine("False"); 
        return false; 
       }); 

     var keepDead = keepAlive.Where(obj => obj).Timeout(TimeSpan.FromMilliseconds(timeout)).Subscribe(r => 
      { 
       if (!r) 
        throw new TimeoutException(); 
      }); 


     var disposable = keepAlive.Finally(keepDead.Dispose).Select(n => this).Subscribe(observer); 
     return disposable; 
    } 

    #endregion 

    #region Implementation of IDisposable 

    public void Dispose() 
    { 
     OnCompleted(); 
    } 

    #endregion 

} 

回答

1

我不確定你的問題是什麼。 但是,你的困惑可能來自你的出發點。我強烈建議您不要執行ISubject<T1, T2>,ISubject<T>,IObserver<T>IObservable<T>接口。

您應該使用Subscribe(Action<T>, Action<Exception>, Action)擴展方法訂閱可觀察序列(IObservable<T>),並使用Observable.Create工廠方法或其它轉換方法(如FromEvent,ToObservable等)

而且乒乓球是並非真正地以Rx的精神。 Rx是關於觀察和組成事件序列的。乒乓似乎更像是消息傳遞。 Rx是一般一個方向的事情,即我將聽取鍵盤事件和做些事情,或聽溫度變化,新的價格等...

你通常不推動密鑰備份,當你得到事件。