2016-09-06 28 views
0

由於源供應商象下面這樣:加入未知數量的來源時,所有來源包含給定鍵

IObservable<ISource> Sources(); 

每個東森光電看起來像下面:

IObservable<IEnumerable<string>> ObserveData(string filter) 

我想退:

IObservable<IEnumerable<string>> Results 

當從所有ISources返回給定字符串時。基本上我想要所有來源的交集。

如果添加了新來源,則應重新評估所有內容。

我很努力想出一個通用的解決方案。我見過的大多數解決方案都有衆所周知的來源。 任何想法讚賞。

回答 好了之後想了一會兒,我想出了我的答案。可能它可以改進,但它似乎適用於我,所以我會張貼在這裏作爲參考,以防其他人有類似的問題。感謝ibebbs和Shlomo花時間回覆,非常感謝。

//Arrange 
     var s1 = Substitute.For<ISource>(); 
     s1.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "a", "b", "c", "d" })); 

     var s2 = Substitute.For<ISource>(); 
     s2.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "b", "xx", "c", "d" })); 

     var s3 = Substitute.For<ISource>(); 
     s3.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "yy", "b", "ff", "d" })); 

     var expected = new[] { "b", "d" }; 

     var sources = new[] { s1, s2, s3 }.ToObservable(); 

     var scheduler = new TestScheduler(); 
     var observer = scheduler.CreateObserver<IList<string>>(); 

     //Act 
     sources.Buffer(TimeSpan.FromMilliseconds(500), scheduler) 
      .Select(s => Observable.CombineLatest(s.Select(x => x.ObserveData("NoFilter")))) 
      .Switch() 
      .Select(x =>IntersectAll(x)) 
      .Do(x => Console.WriteLine($"Recieved {string.Join("," , x)}")) 
      .Subscribe(observer); 

     scheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks); 

     //Assert 
     observer.Messages.AssertEqual(
      OnNext<IList<string>>(0, s => s.SequenceEqual(expected)), 
      OnCompleted<IList<string>>(0)); 

對於IntersectAll,看到Intersection of multiple lists with IEnumerable.Intersect()

+0

是結果屬性或方法?如果一個屬性,過濾器參數應該傳入什麼? – Shlomo

回答

1

好,第二次嘗試,我敢肯定這是你所需要的(測試夾具包含在底部):

public interface ISource 
{ 
    IObservable<IEnumerable<string>> ObserveData(string filter); 
} 

public static class ArbitrarySources 
{ 
    public static IObservable<IEnumerable<string>> Intersection(this IObservable<ISource> sourceObservable, string filter) 
    { 
     return sourceObservable 
      .SelectMany((source, index) => source.ObserveData(filter).Select(values => new { Index = index, Values = values })) 
      .Scan(ImmutableDictionary<int, IEnumerable<string>>.Empty, (agg, tuple) => agg.SetItem(tuple.Index, tuple.Values)) 
      .Select(dictionary => dictionary.Values.Aggregate(Enumerable.Empty<string>(), (agg, values) => agg.Any() ? agg.Intersect(values) : values).ToArray());  
    } 
} 

public class IntersectionTest 
{ 
    internal class Source : ISource 
    { 
     private readonly IObservable<IEnumerable<string>> _observable; 

     public Source(IObservable<IEnumerable<string>> observable) 
     { 
      _observable = observable; 
     } 

     public IObservable<IEnumerable<string>> ObserveData(string filter) 
     { 
      return _observable; 
     } 
    } 

    [Fact] 
    public void ShouldIntersectValues() 
    { 
     TestScheduler scheduler = new TestScheduler(); 

     var sourceA = new Source(scheduler.CreateColdObservable(
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b" })), 
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b", "c" })) 
     )); 

     var sourceB = new Source(scheduler.CreateColdObservable(
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "c" })), 
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "b", "c" })) 
     )); 

     var sources = scheduler.CreateColdObservable(
      new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<ISource>(sourceA)), 
      new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext<ISource>(sourceB)) 
     ); 

     var observer = scheduler.Start(() => sources.Intersection("test"), 0, 0, TimeSpan.FromSeconds(6).Ticks); 

     IEnumerable<string>[] actual = observer.Messages 
      .Select(message => message.Value) 
      .Where(notification => notification.Kind == NotificationKind.OnNext && notification.HasValue) 
      .Select(notification => notification.Value) 
      .ToArray(); 

     IEnumerable<string>[] expected = new [] 
     { 
      new [] { "a", "b" }, 
      new [] { "a" }, 
      new [] { "a", "c" }, 
      new [] { "b", "c" } 
     }; 

     Assert.Equal(expected.Length, actual.Length); 

     foreach (var tuple in expected.Zip(actual, (e, a) => new { Expected = e, Actual = a })) 
     { 
      Assert.Equal(tuple.Expected, tuple.Actual); 
     } 
    } 
} 

這種方法具有不重新查詢現有源時增加了新的源的附加益處但每個任意源發出值時間將重新計算的交叉點。

+0

是的,這是有效的增​​強。謝謝 – user630190

0

如何:

public IObservable<IEnumerable<string>> From(this IObservable<ISource> sources, string filter) 
{ 
    return sources 
     .Scan(Observable.Empty<IEnumerable<string>>(), (agg, source) => Observable.Merge(agg, source.ObserveData(filter))) 
     .Switch(); 
} 

要知道,新源從sources所有先前已發出將有源發出的每一次他們的ObserveData方法再次調用。因此,該解決方案沒有規模特別好,但不符合你的「如果一個新的源被添加那麼一切都應該重新評估」的要求

+0

乍一看,它看起來會合並所有來源,而不是找到交集。因此,如果源代碼輸出[「a」,「b」,「c」]和源代碼兩個輸出[「b」,「a」],我只想得到[「a」,「b」]。 – user630190

相關問題