2012-07-01 53 views
6

我有一堆事件進來,我必須毫不猶豫地執行所有這些事件,但我想確保它們在適當的時間段被緩衝和使用。任何人都有解決方案?什麼是使用Observable「限制」消費的最佳方式?

我找不到任何的Rx運營商可以做,沒有事件的損失(油門 - 失去的事件)。我也考慮過緩衝,延遲等等......找不到一個好的解決方案。

我試圖把一個計時器在中間,但不知何故,沒有在所有的工作:

GetInitSequence() 
      .IntervalThrottle(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime) 
    { 
     return Observable.Create<T>(o => 
      { 
       return source.Subscribe(x => 
        { 
         new Timer(state => 
          o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1)); 
        }, o.OnError, o.OnCompleted); 
     }); 
    } 
+0

你可以添加一個大理石運營商digram顯示你有什麼,以及你想要什麼?像其他人一樣,我不確定你想要實現什麼,因爲我認爲緩衝區就好。 –

+0

你是什麼限制? – Fredrick

回答

10

的問題是不是100%清楚,所以我正在做一些假設。

Observable.Delay是不是你想要的,因爲這將創建一個從當每個事件到達,而不是爲處理創建等時間間隔的延遲。

Observable.Buffer是不是你想要的,因爲這將導致在每個給定的時間間隔的所有事件將被傳遞給你,而不是一次一個。

因此,我相信你正在尋找一種解決方案,創建某種類型的節拍器剔除,並給你一個事件每個滴答。這可能是天真使用Observable.Interval節拍器和Zip它連接到您的源構成:

var source = GetInitSequence(); 
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));  
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now)); 

這將觸發每5秒(在上面的例子),並給你在序列中的原始項目。

這個解決方案的唯一問題是,如果你沒有更多的源元素(比如說)10秒,當源元素到達時,他們會立即發送出去,因爲一些'觸發器'事件正在在那裏等着他們。該場景的大理石圖:

source: -a-b-c----------------------d-e-f-g 
trigger: ----o----o----o----o----o----o----o 
result: ----a----b----c-------------d-e-f-g 

這是一個非常合理的問題。有兩個問題在這裏已經是解決它:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

提供的解決方案主要Drain擴展方法和二次Buffered擴展。我已經修改了這些更簡單(不需要Drain,只需使用Concat)。用法是:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5)); 

擴展方法StepInterval

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 
+0

謝謝。仍然不是我一直在尋找的東西,但它向我展示了一些想法。我只是對Rx感到沮喪 - 爲什麼它應該如此複雜並且沒有適當的文檔。學習曲線陡峭,需要對主題有廣泛的瞭解才能獲得有價值的東西。 #fail – IgorM

+1

同意。這就是爲什麼我花了很多時間寫IntroToRx.com來幫助你的職位。這很難,而且有很多東西需要學習。 –

+0

我真的發現這些Rx操作員很難閱讀和推理。我認爲這是我的侷限 - 這可能是因爲我有一個視覺的頭腦,我無法想象結果。是否有機會獲得這個答案中的代碼的大理石圖? –

0

如何Observable.Buffer?這應該將1s窗口中的所有事件作爲單個事件返回。

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5)); 
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); }); 

這可能是你所問的不清楚。你的代碼應該做什麼?看起來你只是通過爲每個事件創建一個計時器來延遲。它也打破了觀察者的語義,因爲下一個完成可能會發生。

注意這也只是在使用的計時器準確。典型的定時器精確到16ms。

編輯:

你的榜樣變得和項目包含窗口中的所有事件:

GetInitSequence() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 
1

我知道這可能只是太簡單,但將這項工作?

var intervaled = source.Do(x => { Thread.Sleep(100); }); 

基本上這只是在值之間放置一個最小延遲。太簡單了?

+0

這匹配和修復了OP的行爲IntervalThrottle是否真的明智? –

+1

Eeek ...阻止線程!?面對Rx校長,這種情況是對的? –

+0

是的,從它最純粹的意義上說,這是違反Rx的,但要求是阻止 –

1

隨着Enigmativity的答案的線,如果你想要做的只是延遲全部由一個時間跨度值的,我不能明白爲什麼Delay是不是你想要

GetInitSequence() 
     .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here 
     .Subscribe(
      item => 
       { 
        Console.WriteLine(DateTime.Now); 
        // Process item 
       } 
     ); 
相關問題