2012-09-29 32 views
3

我有以下類型...獲取下一個事件序列中每秒有反應的擴展

public class NewsFeed 
{ 
    public event EventHandler<NewsItemEventArgs> NewItem; 

    ..... 

} 

public class NewsItemEventArgs : EventArgs 
{ 
    public NewsItem Item; 
    public NewsItemEventArgs(NewsItem newsItem) 
    { 
     Item = newsItem; 
    } 
} 

public class NewsItem 
{ 
    public int Id { get; set; } 
    public string Title { get; set; } 
    public string Body { get; set; } 
} 

新聞推送的的newitem事件觸發型NewsItemEventArgs的EventArgs的事件。在我的系統中,事件以突發形式發佈,例如在1秒鐘的小窗口中顯示10個NewsItems,然後在60秒內沒有其他新聞報道。我想用RX平滑這些爆發,所以我的UI新聞故事「出現」以更常規的時間間隔5秒到達。

我知道我需要創建一個可觀察下面

_source = Observable.FromEventPattern<NewsItemEventArgs>(
         h => _newsFeed.NewItem += h, 
         h => _newsFeed.NewItem -= h); 

有點像,但我不知道如何變換和訂閱觀察到這樣,我得到滴喂的事件,而不是他們在未來上述爆發。

任何想法?

回答

2

你可以與其他哪些定期生成值壓縮您的序列:

Observable<NewsItem> nis = _source 
    .Zip(Observable.Timer(Timespan.FromSeconds(5), TimeSpan.FromSeconds(5)), (e, _) => e) 
    .Select(eventArgs => eventArgs.Item); 
+3

如果觀察到的定時器運行提前源這是行不通的。如果確實如此,那麼當信息源將它們推出時,這些值將立即出現。 – Enigmativity

+1

如果計時器通知和源通知的數量不匹配,則會發生事件積壓。 – Asti

2

郵編可能不是此操作的最佳選擇,因爲有一個機會,爲生產者要慢,有時,產生的緊張的輸出。

似乎準確scheduling with DateTimeOffset仍然是不可能與Rx 2.0。 TimeSpan雖然現在工作。您可以通過將TimeSpan偏移量替換爲DateTimeOffset來嘗試。總之,如果我們可以指定兩個連續值之間的最小間隔,我們可以解決突發問題。

static IObservable<T> DelayBetweenValues<T>(this IObservable<T> observable, TimeSpan interval, IScheduler scheduler) 
    { 
     return Observable.Create<T>(observer => 
     { 
      var offset = TimeSpan.Zero; 
      return observable 
       .TimeInterval(scheduler) 
       .Subscribe 
       (
        ts => 
        { 
         if (ts.Interval < interval) 
         { 
          offset = offset.Add(interval); 
          scheduler.Schedule(offset,() => observer.OnNext(ts.Value)); 
         } 
         else 
         { 
          offset = TimeSpan.Zero; 
          observer.OnNext(ts.Value); 
         } 
        } 
       ); 
     }); 
    } 

測試:

 Observable.Interval(TimeSpan.FromSeconds(2.5)) 
        .Do(_ => Console.WriteLine("Burst")) 
        .SelectMany(i => Enumerable.Range((int)i, 10)) 
        .DelayBetweenValues(TimeSpan.FromSeconds(0.2), TaskPoolScheduler.Default) 
        .Subscribe(Console.WriteLine); 
1

我無法從阿斯蒂答案工作。所以我用delayDurationSelector嘗試了無文檔的Delay重載。我得到它的工作對我的問題,但不知何故它的行爲不正常,當我使用的調度器,定時器,但它的工作原理沒有它:

public static IObservable<T> DelayBetweenValues<T>(this IObservable<T> observable, TimeSpan interval, 
    IScheduler scheduler) 
{ 
     var offset = TimeSpan.Zero; 
     return observable 
      .TimeInterval(scheduler) 
      .Delay(ti => 
      { 
       offset = (ti.Interval < interval) ? offset.Add(interval) : TimeSpan.Zero; 
       return Observable.Timer(offset); 
      }) 
      .Select(ti => ti.Value); 
}