我有一個大的集合一雙簡單的類:使用RX在不同時間觸發事件?
public class Pair { public DateTime Timestamp; public double Value; }
它們通過時間戳升序排序。我想在適當的時候爲列表中的每個項目觸發一個具有值的事件(例如,動作<double>)。時間在過去,所以我需要對時間戳進行標準化,使列表中的第一個「現在」。我們可以使用Reactive Extensions來設置它,以便在兩個項目之間的時間差異之後觸發下一個事件?
我有一個大的集合一雙簡單的類:使用RX在不同時間觸發事件?
public class Pair { public DateTime Timestamp; public double Value; }
它們通過時間戳升序排序。我想在適當的時候爲列表中的每個項目觸發一個具有值的事件(例如,動作<double>)。時間在過去,所以我需要對時間戳進行標準化,使列表中的第一個「現在」。我們可以使用Reactive Extensions來設置它,以便在兩個項目之間的時間差異之後觸發下一個事件?
說pairs
是你的序列:
var obs = pairs.OrderBy(p => p.Timestamp).ToObservable();
現在obs
是對作爲一個有序觀測。
Observable.Zip(
obs,
obs.Take(1).Concat(obs),
(pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp)
.Select(_ => pair1.Value))
.Concat()
.Subscribe(/* Do something here */);
拉鍊負責將絕對時間轉換爲偏移量。這將需要的順序和與自己會合,但偏移減去一個,如下
Original 1--2--4--7--11
Offset 1--1--2--4--7--11
Joined 0--1--2--3--4
那麼這個新的值被放入Observable.Timer
推遲其相應的金額。最後的Concat
將結果從IObservable<IObservable<double>>
變爲IObservable<double>
。這假設你的序列是有序的。
我覺得這個問題很有趣,這將是我第一次參加它。
static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent)
{
if (pairs == null || !pairs.Any() || pairEvent == null)
return;
// if we can promise the pairs are already sorted
// obviously we don't need this next line
pairs = pairs.OrderBy(p => p.Timestamp);
var first = pairs .First().Timestamp;
var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p });
var start = DateTime.Now;
double interval = 250; // 1/4 second
Timer timer = new Timer(interval);
timer.AutoReset = true;
timer.Elapsed += (sender, elapsedArgs) =>
{
var signalTime = elapsedArgs.SignalTime;
var elapsedTime = (signalTime - start);
var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair);
wrapped = wrapped.Skip(pairsToTrigger.Count());
if (!wrapped.Any())
timer.Stop();
foreach (var pair in pairsToTrigger)
pairEvent(pair.Value);
};
timer.Start();
}
如果「使用的Rx」你讓我只需要使用的Rx調度,那麼這是一個非常簡單的解決辦法:
Action<double> action =
x =>
Console.WriteLine(x);
var ts0 = pairs.Select(p => p.Timestamp).Min();
pairs
.ForEach(p =>
Scheduler
.ThreadPool
.Schedule(
p.Timestamp.Subtract(ts0),
() => action(p.Value)));
它使用System.Interactive
擴展ForEach
,但你可以只使用一個常規的foreach
循環來加載調度器。
我測試過的代碼與下面的僞數據:
var pairs = new []
{
new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, },
new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, },
new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, },
};
我希望這有助於。
調度程序是否有自己的隊列?或者這個代碼會咀嚼整個線程池?我只是擔心這個解決方案的可擴展性。 – Brannon
@Brannon - 如果我記得正確的調度程序在內部使用堆排序來排隊的行動。另外,調度程序一次只能執行一個操作,如果另一個操作立即可用,將重新使用當前線程。所以他們一次只能使用一個線程。 – Enigmativity
你看過http://reactiveproperty.codeplex.com/了嗎? – dwerner