對於基於Rx的變化跟蹤解決方案,我需要一個運算符,它可以以可觀察序列獲取第一個和最近一個項目。Rx:用於從Observable流獲取第一個和最近值的運算符
我怎麼會寫會產生以下大理石圖的Rx操作(注:括號只是用來陣容的項目......我不知道如何最好地在文本代表此):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
對於基於Rx的變化跟蹤解決方案,我需要一個運算符,它可以以可觀察序列獲取第一個和最近一個項目。Rx:用於從Observable流獲取第一個和最近值的運算符
我怎麼會寫會產生以下大理石圖的Rx操作(注:括號只是用來陣容的項目......我不知道如何最好地在文本代表此):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
使用相同的命名爲@Wilka你可以用下面的擴展,是有點不言自明:
public static IObservable<TResult> FirstAndLatest<T, TResult>(this IObservable<T> source, Func<T,T,TResult> func)
{
var published = source.Publish().RefCount();
var first = published.Take(1);
return first.CombineLatest(published, func);
}
注意,它不一定返回Tuple
,而是爲您提供在結果上傳遞選擇器函數的選項。這使它與基本的主要操作(CombineLatest
)保持一致。這顯然很容易改變。
用法(如果你想的元組產生的數據流中):
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest((a,b) => Tuple.Create(a,b))
.Subscribe(Console.WriteLine);
我懷疑有這樣做的更好的方法(和我不喜歡使用這樣做),但你可以創建一個這樣
public static IObservable<Tuple<T, T>> FirstAndLatest2<T>(this IObservable<T> source)
{
return Observable.Defer(() => {
bool hasFirst = false;
T first = default(T);
return source
.Do(item =>
{
if (!hasFirst)
{
hasFirst = true;
first = item;
}
})
.Select(current => Tuple.Create(first, current));
});
}
操作,那麼你會用它LIK E本:
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest()
.Subscribe(Console.WriteLine);
FirstAndLatest錯誤。由於缺乏懶惰,hasFirst和First狀態在所有訂閱中共享。所有的自定義操作符應該首先調用Observable.Create,除非它純粹是一個現有操作符組合的「宏」。或者,您也可以在這裏使用Observable.Defer以創建每個訂閱狀態。 – 2012-07-28 22:12:19
謝謝,我錯過了。我已更新我的答案以使用Observable.Defer – Wilka 2012-07-30 08:51:43
試試這個:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source
.Take(1)
.Repeat()
.Zip(source, (x0, xn) => Tuple.Create(x0, xn));
}
簡單,是吧?
或者,以共享底層源替代,試試這個:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source.Publish(
s =>
s.Take(1)
.Repeat()
.Zip(s, (x0, xn) => Tuple.Create(x0, xn)));
}
哎呦!抓住這個。它不起作用。它基本上不斷產生一對最新值。這樣發佈不起作用。原始實施是最好的。
您應該添加一個發佈操作符來共享訂閱源的副作用。上面的FirstAndLatest實現將導致兩個訂閱源爲其每個訂閱的結果,這可能會導致大量重複計算(或更糟糕的副作用,如啓動I/O和什麼)。 – 2012-07-28 22:14:07
@BartDeSmet - 我故意用這種方式實現它。如果源觀察值很熱,那麼我們是否不希望爲每個訂閱進行計算?不過,我會做出一個替代實施來避免這些問題。 – Enigmativity 2012-07-29 00:48:39
@Enigmativity仍然認爲'Take(1).CombineLatest'比Take(1).Repeat()。Zip'更合適。 – yamen 2012-07-30 20:39:27
您應該添加一個發佈操作符來共享訂閱源的副作用。上面的FirstAndLatest實現將導致兩個訂閱源爲其每個訂閱的結果,這可能會導致大量重複計算(或更糟糕的副作用,如啓動I/O和什麼)。 – 2012-07-28 22:13:35
我接受這個答案,因爲這是第一個正確答案,儘管Bart的評論讓我想知道如何將Publish()整合到實現中。這不僅僅是在最後加上Publish()的問題。 – Damian 2012-07-29 00:01:37
根據@BartDeSmet我添加了'Publish'(還添加了'RefCount',不知道這是否是首選方式,或者調用'Connect')。 – yamen 2012-07-29 21:20:47