2012-04-03 50 views
2

我一直在試圖理解的Rx更深入,按照巴特德Smetts MinLinq和Jon長柄水杓「重新實現」系列,我建立了一個很好的理解,但...IObservable的Where()擴展方法是如何實現的?

以下列代碼爲例

var onePerSecond = Observable.Interval(TimeSpan.FromSeconds(1)); 
var evenNums = onePerSecond.Where(x => x % 2 == 0); 
evenNums.Subscribe(Console.WriteLine); 

從equivelant 的IEnumerable的觀點我理解的MoveNext /當前的數據流,並從長柄水杓先生的博客如何Where方法可以使用的foreach超過了IEnumerable實現‘這個’延長參數方法。

但是,在IObservable的Where方法的情況下,它是否會包含實現IObserver接口(或lambda equivelant)的代碼,因此會有效地觀察來自onePerSecond對象的所有通知,然後返回僅包含值的IObservable謂詞被發現是真實的?

任何幫助和想法非常歡迎,非常感謝

詹姆斯

回答

3

通過尋找與ILSpy的源代碼可以很容易地看到,正是在哪裏如何實現。它返回一個新的觀察到的,其過濾基於謂詞傳遞物品:

public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate) 
{ 
    if (source == null) 
    { 
     throw new ArgumentNullException("source"); 
    } 
    if (predicate == null) 
    { 
     throw new ArgumentNullException("predicate"); 
    } 
    return new AnonymousObservable<TSource>((IObserver<TSource> observer) => source.Subscribe(delegate(TSource x) 
    { 
     bool flag; 
     try 
     { 
      flag = predicate(x); 
     } 
     catch (Exception error) 
     { 
      observer.OnError(error); 
      return; 
     } 
     if (flag) 
     { 
      observer.OnNext(x); 
     } 
    } 
    , new Action<Exception>(observer.OnError), new Action(observer.OnCompleted))); 
} 
+0

嗨Giorgi的,非常感謝,那太棒了!我應該想到:-)感謝您的幫助。詹姆斯 – jameschinnock 2012-04-04 07:36:15

+0

如果我建立這個,我得到構建錯誤''System.Reactive.AnonymousObservable '由於其保護級別是不可訪問的' – 2012-05-28 10:51:43

+0

AnonymousObservable是反應擴展的內部,但有一個公共方法Observable.Create,可以讓您觀察類似,https://gist.github.com/2819921 – 2012-05-28 16:36:36

1

下面是一些玩具例子讓它是如何工作的感覺:

https://github.com/ScottWeinstein/Rx-Demo/tree/master/ImplementWhereDemo

public class WhereObservableLessPedantic<T> : IObservable<T> 
{ 
    private Func<T, bool> _pred; 
    private IObservable<T> _stream; 

    public WhereObservableLessPedantic(IObservable<T> stream, Func<T, bool> pred) 
    { 
     _pred = pred; 
     _stream = stream; 
    } 

    public IDisposable Subscribe(IObserver<T> downStreamObserver) 
    { 
     Action<T> onNext = nextVal => 
     { 
      if (_pred(nextVal)) 
       downStreamObserver.OnNext(nextVal); 
     }; 
     return _stream.Subscribe(onNext); 
    } 
} 


public class WhereObserverPedantic<T> : IObserver<T> 
{ 
    private IObserver<T> _downStreamObserver; 
    private Func<T, bool> _pred; 

    public WhereObserverPedantic(IObserver<T> downStreamObserver, Func<T, bool> pred) 
    { 
     _pred = pred; 
     _downStreamObserver = downStreamObserver; 
    } 

    public void OnNext(T value) 
    { 
     if (_pred(value)) 
     { 
      _downStreamObserver.OnNext(value); 
     } 
    } 

    public void OnCompleted() { } 
    public void OnError(Exception error) { } 
} 
+0

謝謝斯科特,多數民衆贊成。我將在現在發送的鏈接中查看代碼。非常感謝!詹姆士 – jameschinnock 2012-04-04 07:37:13