2014-02-06 107 views
2

假設我有一組我正在監視可用性的URI。每個URI是「向上」或「向下」,而新的URI來監控可以隨時添加到系統:結合最新的可觀察事件

public enum ConnectionStatus 
{ 
    Up, 
    Down 
} 

public class WebsiteStatus 
{ 
    public string Uri 
    { 
     get; 
     set; 
    } 

    public ConnectionStatus Status 
    { 
     get; 
     set; 
    } 
} 

public class Program 
{ 
    static void Main(string[] args) 
    { 
     var statusStream = new Subject<WebsiteStatus>(); 
     Test(statusStream); 

     Console.WriteLine("Done"); 
     Console.ReadKey(); 
    } 

    private static void Test(IObservable<WebsiteStatus> statusStream) 
    { 
    } 
} 

現在Test()假設我要被動地查明:

  • 是否所有的URI下降(作爲bool
  • 其URI是下降(如IEnumerable<string>

所以Test竟又肌酸g IObservable<Tuple<bool, IEnumerable<string>>>這樣的可觀察值,其中bool指示是否所有的URI都關閉,並且IEnumerable<string>包含那些是的URI。

我該如何解決這個問題?我最初的想法是,我需要按URI進行分組,然後將每個組的最新信息合併到一個列表中,然後我可以執行Select。但是,由於CombineLatest的工作方式,這並未奏效。

編輯:感謝馬修的答案,我看着RXX,發現它完全時尚我會在RX預料開箱實施CombineLatest超載,但我需要這樣它發佈到更改即使只有一個源數據流被合併(默認情況下,它正在等待至少兩個源數據流)。另外,爲了一種方法,我不能證明需要額外增加2MB的二進制文件,所以我將其複製/粘貼到我的項目中。這樣做,我是能夠解決如下:

private static void Test(IObservable<WebsiteStatus> statusStream) 
{ 
    statusStream 
     .GroupBy(x => x.Uri) 
     .CombineLatest() 
     .Select(
      x => 
      { 
       var down = x.Where(y => y.Status == ConnectionStatus.Down); 
       var downCount = down.Count(); 
       var downUris = down.Select(y => y.Uri).ToList(); 

       return new 
       { 
        AllDown = x.Count == downCount, 
        DownUris = downUris 
       }; 
      }) 
     .Subscribe(x => 
     { 
      Console.WriteLine(" Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | "))); 
     }); 
} 

回答

4

最巧妙的方法是使用RXx中擴展的this answer。下面是一個替代方案,它只是保存一個正在關閉/關閉的網站列表。

var downStream = statusStream 
    .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (down, newStatus) => 
    { 
     if (newStatus.IsUp) 
      return down.Where(uri => uri != newStatus.Uri); 
     else if (!down.Contains(newStatus.Uri)) 
      return down.Concat(new string[] { newStatus.Uri }); 
     else 
      return down; 
    }); 

var upStream = statusStream 
    .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (up, newStatus) => 
    { 
     if (!newStatus.IsUp) 
      return up.Where(uri => uri != newStatus.Uri); 
     else if (!up.Contains(newStatus.Uri)) 
      return down.Concat(new string[] { newStatus.Uri }); 
     else 
      return up; 
    }); 

var allDown = upStream.Select(up => !up.Any()); 
+0

謝謝你保存我的理智。已經更新了我的問題,並最終實現了我的解決方案。 –