假設我有一組我正在監視可用性的URI。每個URI是「向上」或「向下」,而新的URI來監控可以隨時添加到系統:結合最新的可觀察事件
public enum ConnectionStatus
{
Up,
Down
}
public class WebsiteStatus
{
public string Uri
{
get;
set;
}
public ConnectionStatus Status
{
get;
set;
}
}
public class Program
{
static void Main(string[] args)
{
var statusStream = new Subject<WebsiteStatus>();
Test(statusStream);
Console.WriteLine("Done");
Console.ReadKey();
}
private static void Test(IObservable<WebsiteStatus> statusStream)
{
}
}
現在Test()
假設我要被動地查明:
- 是否所有的URI下降(作爲
bool
) - 其URI是下降(如
IEnumerable<string>
)
所以Test
竟又肌酸g IObservable<Tuple<bool, IEnumerable<string>>>
這樣的可觀察值,其中bool
指示是否所有的URI都關閉,並且IEnumerable<string>
包含那些是的URI。
我該如何解決這個問題?我最初的想法是,我需要按URI進行分組,然後將每個組的最新信息合併到一個列表中,然後我可以執行Select
。但是,由於CombineLatest
的工作方式,這並未奏效。
編輯:感謝馬修的答案,我看着RXX,發現它完全時尚我會在RX預料開箱實施CombineLatest
超載,但我需要這樣它發佈到更改即使只有一個源數據流被合併(默認情況下,它正在等待至少兩個源數據流)。另外,爲了一種方法,我不能證明需要額外增加2MB的二進制文件,所以我將其複製/粘貼到我的項目中。這樣做,我是能夠解決如下:
private static void Test(IObservable<WebsiteStatus> statusStream)
{
statusStream
.GroupBy(x => x.Uri)
.CombineLatest()
.Select(
x =>
{
var down = x.Where(y => y.Status == ConnectionStatus.Down);
var downCount = down.Count();
var downUris = down.Select(y => y.Uri).ToList();
return new
{
AllDown = x.Count == downCount,
DownUris = downUris
};
})
.Subscribe(x =>
{
Console.WriteLine(" Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | ")));
});
}
謝謝你保存我的理智。已經更新了我的問題,並最終實現了我的解決方案。 –