2013-11-26 111 views

回答

31

你可以自由地添加此功能,您的Rx運營商,而你正在開發他們看到發生了什麼:

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null) 
    { 
     opName = opName ?? "IObservable"; 
     Console.WriteLine("{0}: Observable obtained on Thread: {1}", 
          opName, 
          Thread.CurrentThread.ManagedThreadId); 

     return Observable.Create<T>(obs => 
     { 
      Console.WriteLine("{0}: Subscribed to on Thread: {1}", 
           opName, 
           Thread.CurrentThread.ManagedThreadId); 

      try 
      { 
       var subscription = source 
        .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}", 
               opName, 
               x, 
               Thread.CurrentThread.ManagedThreadId), 
         ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}", 
               opName, 
               ex, 
               Thread.CurrentThread.ManagedThreadId), 
         () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}", 
               opName, 
               Thread.CurrentThread.ManagedThreadId) 
        ) 
        .Subscribe(obs); 
       return new CompositeDisposable(
        subscription, 
        Disposable.Create(() => Console.WriteLine(
          "{0}: Cleaned up on Thread: {1}", 
          opName, 
          Thread.CurrentThread.ManagedThreadId))); 
      } 
      finally 
      { 
       Console.WriteLine("{0}: Subscription completed.", opName); 
      } 
     }); 
    } 

下面是一個例子使用,顯示了Range一個微妙的行爲差異:

Observable.Range(0, 1).Spy("Range").Subscribe(); 

給出輸出:

Range: Observable obtained on Thread: 7 
Range: Subscribed to on Thread: 7 
Range: Subscription completed. 
Range: OnNext(0) on Thread: 7 
Range: OnCompleted() on Thread: 7 
Range: Cleaned up on Thread: 7 

但是這樣:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe(); 

給人的輸出:

Range: Observable obtained on Thread: 7 
Range: Subscribed to on Thread: 7 
Range: OnNext(0) on Thread: 7 
Range: OnCompleted() on Thread: 7 
Range: Subscription completed. 
Range: Cleaned up on Thread: 7 

現貨的區別?

很明顯,你可以改變這個寫入日誌或調試,或使用預處理器指令做一個發佈版本等精益傳遞訂閱...

您可以在整個運營鏈應用Spy 。例如: -

Observable.Range(0,3).Spy("Range") 
      .Scan((acc, i) => acc + i).Spy("Scan").Subscribe(); 

給人的輸出:

Range: Observable obtained on Thread: 7 
Scan: Observable obtained on Thread: 7 
Scan: Subscribed to on Thread: 7 
Range: Subscribed to on Thread: 7 
Range: Subscription completed. 
Scan: Subscription completed. 
Range: OnNext(1) on Thread: 7 
Scan: OnNext(1) on Thread: 7 
Range: OnNext(2) on Thread: 7 
Scan: OnNext(3) on Thread: 7 
Range: OnCompleted() on Thread: 7 
Scan: OnCompleted() on Thread: 7 
Range: Cleaned up on Thread: 7 
Scan: Cleaned up on Thread: 7 

我敢肯定,你可以找到豐富這個根據自己的意圖的方式。

+1

如果Do(x => Console.WriteLine(...))不夠用,這是一個非常不錯的解決方案。 –

+0

謝謝布萊恩。我完全同意Do(...)*大部分時間都足夠了。許多較爲棘手的問題都是圍繞着評估和訂閱問題展開的,可以幫助解決這些問題。它使Rx運營商的整個生命週期更加明顯。 –

+1

通過將訂閱放在字段中,然後使用Console.WriteLine將其封裝在CompositeDisposable中,可以讓用戶記錄退訂。尼斯。 – Benjol