2012-05-03 72 views
5

我有一個大的集合一雙簡單的類:使用RX在不同時間觸發事件?

public class Pair { public DateTime Timestamp; public double Value; } 

它們通過時間戳升序排序。我想在適當的時候爲列表中的每個項目觸發一個具有值的事件(例如,動作<double>)。時間在過去,所以我需要對時間戳進行標準化,使列表中的第一個「現在」。我們可以使用Reactive Extensions來設置它,以便在兩個項目之間的時間差異之後觸發下一個事件?

+0

你看過http://reactiveproperty.codeplex.com/了嗎? – dwerner

回答

6

pairs是你的序列:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable(); 

現在obs是對作爲一個有序觀測。

Observable.Zip(
    obs, 
    obs.Take(1).Concat(obs), 
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp) 
     .Select(_ => pair1.Value)) 
.Concat() 
.Subscribe(/* Do something here */); 

拉鍊負責將絕對時間轉換爲偏移量。這將需要的順序和與自己會合,但偏移減去一個,如下

Original 1--2--4--7--11 
Offset 1--1--2--4--7--11 
Joined 0--1--2--3--4 

那麼這個新的值被放入Observable.Timer推遲其相應的金額。最後的Concat將結果從IObservable<IObservable<double>>變爲IObservable<double>。這假設你的序列是有序的。

+0

很好的解決方案。我會添加'var orderedObs = pairs.OrderBy(p => p.Timestamp).ToObservable()'來明確需要發生什麼並使用它。我做了這些更改.. – yamen

+0

這有助於很多。我用它來查詢歷史數據,並像它最初記錄的那樣回放它。用來證明新系統工作的模擬器。 –

+0

花了我一段時間來弄清楚究竟發生了什麼,但我現在明白了。 Rx是一個mindf ***。真棒解決方案,但。 +1 – BFree

0

我覺得這個問題很有趣,這將是我第一次參加它。

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent) 
{ 
    if (pairs == null || !pairs.Any() || pairEvent == null) 
    return; 

    // if we can promise the pairs are already sorted 
    // obviously we don't need this next line 
    pairs = pairs.OrderBy(p => p.Timestamp); 
    var first = pairs .First().Timestamp; 
    var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p }); 

    var start = DateTime.Now; 

    double interval = 250; // 1/4 second 
    Timer timer = new Timer(interval); 

    timer.AutoReset = true; 
    timer.Elapsed += (sender, elapsedArgs) => 
    { 
    var signalTime = elapsedArgs.SignalTime; 
    var elapsedTime = (signalTime - start); 

    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair); 
    wrapped = wrapped.Skip(pairsToTrigger.Count()); 

    if (!wrapped.Any()) 
     timer.Stop(); 

    foreach (var pair in pairsToTrigger) 
     pairEvent(pair.Value);  
    }; 

    timer.Start(); 
} 
+0

由於Rx具有諸如「Timer」,「Defer」和「Delay」等擴展名,這實際上是非常複雜的。 – yamen

+0

@yamen我從來沒有使用Rx,也沒有真正計劃。我想回應如何從頭開始做一個挑戰,因爲我認爲這是一個衝突:)抱歉,如果我在這方面的答案只是垃圾郵件。 – payo

+2

不需要道歉,我希望你從上面的Rx解決方案中學到一些東西。實際上,你的答案就是爲什麼Rx很好的例子:-) – yamen

2

如果「使用的Rx」你讓我只需要使用的Rx調度,那麼這是一個非常簡單的解決辦法:

Action<double> action = 
    x => 
     Console.WriteLine(x); 

var ts0 = pairs.Select(p => p.Timestamp).Min(); 

pairs 
    .ForEach(p => 
     Scheduler 
      .ThreadPool 
      .Schedule(
       p.Timestamp.Subtract(ts0), 
       () => action(p.Value))); 

它使用System.Interactive擴展ForEach,但你可以只使用一個常規的foreach循環來加載調度器。

我測試過的代碼與下面的僞數據:

var pairs = new [] 
{ 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, }, 
}; 

我希望這有助於。

+0

調度程序是否有自己的隊列?或者這個代碼會咀嚼整個線程池?我只是擔心這個解決方案的可擴展性。 – Brannon

+0

@Brannon - 如果我記得正確的調度程序在內部使用堆排序來排隊的行動。另外,調度程序一次只能執行一個操作,如果另一個操作立即可用,將重新使用當前線程。所以他們一次只能使用一個線程。 – Enigmativity

相關問題