2012-03-18 43 views
25

我想給RX查詢是不平凡的模型(對我來說):加入的Rx流

  • 在一個房間裏有男人和女人。
  • 他們進入和離開房間,而在房間裏有時他們改變他們的位置。
  • 每個男人可以在給定的時間看一個(或零)女人。
  • 每個人具有以下屬性:

    class Man 
    { 
        public const int LookingAtNobody = 0; 
        public int Id { get; set; } 
        public double Location { get; set; } 
        public int LookingAt { get; set; } 
    } 
    
  • 每個女人具有以下屬性:

    class Woman 
    { 
        public int Id { get; set; } 
        public double Location { get; set; } 
    } 
    
  • 爲了表示我們有IObservable<IObservable<Man>>男子,並表示我們有IObservable<IObservable<Woman>>婦女。

你將如何使用的Rx產生從男人到女人的載體,他們正在尋找:IObservable<IObservable<Tuple<double,double>>>

幫助,這裏有一些簡單的情況下,一些單元測試:

public class Tests : ReactiveTest 
{ 
    [Test] 
    public void Puzzle1() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(300)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle2() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(350), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle3() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle4() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }), 
      OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }), 
      OnCompleted<Man>(500)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 
     var w2 = scheduler.CreateHotObservable(
      OnNext(155, new Woman { Id = 20, Location = 100.0 }), 
      OnNext(255, new Woman { Id = 20, Location = 200.0 }), 
      OnNext(355, new Woman { Id = 20, Location = 300.0 }), 
      OnCompleted<Woman>(455)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     var expectedVector2 = new[] 
         { 
          OnNext(300, Tuple.Create(3.0, 200.0)), 
          OnNext(355, Tuple.Create(3.0, 300.0)), 
          OnNext(400, Tuple.Create(4.0, 300.0)), 
          OnCompleted<Tuple<double,double>>(455), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
     ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]); 
    } 

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men) 
    { 
     // assuming nested sequences are hot 
     var vectors = 
      from manDuration in men 
      join womanDuration in women on manDuration equals womanDuration 
      select from man in manDuration 
        join woman in womanDuration on manDuration equals womanDuration 
        where man.LookingAt == woman.Id 
        select Tuple.Create(man.Location, woman.Location); 

     var query = vectors.Select(vectorDuration => 
     { 
      var vectorResults = scheduler.CreateObserver<Tuple<double, double>>(); 
      vectorDuration.Subscribe(vectorResults); 
      return vectorResults.Messages; 
     }); 

     var results = scheduler.Start(() => query, 0, 0, 1000).Messages; 
     return results; 
    } 
} 

(注:這個問題是交叉貼到Rx論壇:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe

+4

DAT IEnumerable <記錄<通知>>>>>> – Asti 2012-03-19 08:54:44

+0

您發佈到MSDN論壇的帖子以及創建的喋喋不休的數量證明這不是一件好事問答網站問題。 – 2013-03-27 23:14:35

+9

「不要過河」 - Egon Spengler博士 – 2013-10-11 16:40:28

回答

1

如果我理解正確,目標是創建一個「跟隨觀察者」的觀察點,其中一個「跟蹤觀察者」從一個男人開始注視一個女人開始,到該男人停止注視該女人時結束。 「follow observable」應該由男人和女人的最近位置的元組組成。

這裏的想法是使用CombineLatest,它將帶有兩個可觀測值,當它們中的任何一個產生一個值時,組合器將針對觀測值的兩個最近值進行評估,這會在組合觀測值中產生一個值。但是,CombineLatest只有在兩個可觀測量都完成時纔會完成。在這種情況下,我們希望在兩個源中的任何一個完成時完成觀察值。爲了做到這一點,我們定義了下面的擴展方法(我不相信這樣的方法已經存在,但也有可能是一個簡單的解決方案):

public static IObservable<TSource> 
    UntilCompleted<TSource, TWhile>(this IObservable<TSource> source, 
             IObservable<TWhile> lifetime) 
{ 
    return Observable.Create<TSource>(observer => 
    { 
    var subscription = source.Subscribe(observer); 
    var limiter = lifetime.Subscribe(next => { },() => 
    { 
     subscription.Dispose(); 
     observer.OnCompleted(); 
    }); 
    return new CompositeDisposable(subscription, limiter); 
    }); 
} 

,此方法類似於TakeUntil,但它代替直到lifetime產生一個值,直到lifetime完成。我們也可以定義一個簡單的擴展方法,它滿足的謂詞的第一條痕:

public static IObservable<TSource> 
    Streak<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate); 
} 

現在對於最終的查詢,我們將所有的人使用CombineLatest所有婦女,並完成可觀察使用早期UntilCompleted。爲了得到「遵守觀察結果」,我們選擇男人正在看着女人的連勝。然後我們簡單地將它映射到一個位置元組。

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    select manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streak(pair => pair.Man.LookingAt == pair.Woman.Id) 
    .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location)); 

這通過所有的測試,但其中一人在看一會兒女人10,然後在20一會兒,然後在10再次有一段時間不處理的情況;僅使用第一個條紋。遵守所有的條紋,我們可以使用下面的擴展方法,它返回一個可觀察的條紋:

public static IObservable<IObservable<TSource>> 
    Streaks<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return Observable.Create<IObservable<TSource>>(observer => 
    { 
    ReplaySubject<TSource> subject = null; 
    bool previous = false; 
    return source.Subscribe(x => 
    { 
     bool current = predicate(x); 
     if (!previous && current) 
     { 
     subject = new ReplaySubject<TSource>(); 
     observer.OnNext(subject); 
     } 
     if (previous && !current) subject.OnCompleted(); 
     if (current) subject.OnNext(x); 
     previous = current; 
    },() => 
    { 
     if (subject != null) subject.OnCompleted(); 
     observer.OnCompleted(); 
    }); 
    }); 
} 

通過訂閱只有一次的源流,並通過使用ReplaySubject,這種方法適用於熱以及冷觀察。現在對於最終的查詢,我們選擇所有條紋如下:

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    from streak in manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id) 
    select streak.Select(pair => 
    Tuple.Create(pair.Man.Location, pair.Woman.Location)); 
0

我不知道我理解你爲什麼建模的男人和女人這兩個位置的流作爲IObservable<IObservable<T>>,而不僅僅是一個IObservable<T>,但是這可能工作:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                      IObservable<IObservable<Woman>> womenObservable) 
{ 
    return Observable.CombineLatest(
     menObservable.Switch(), 
     womenObservable.Switch(), 
     (man, woman) => new {man, woman}) 
      .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id) 
      .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location)); 
} 

時將其壓開關基本上是「開關」新觀察到的,這拉平流。 where和select是相當簡單的。

我有一個偷偷摸摸的懷疑我誤解了需求的一些東西,但我想我會提交我的答案,以防萬一它有幫助。