2011-06-19 41 views
2

我的場景: 我有一個計算應該每秒運行一次。運行後,應該等待大約200ms,以便其他內容跟上。如果計算器在一秒鐘後仍然運行,應該再次啓動,但是程序應該等待直到完成,並在完成後200ms開始下一次計算。我怎樣才能減慢Observable而不丟掉RX中的值?

的方式我現在這樣做:

_refreshFinished = new Subject<bool>(); 
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000)) 
    .Zip(_refreshFinished, (x,y) => x) 
    .Subscribe(x => AutoRefresh(stuff)); 

這段代碼的問題是,我看不到出路擺在延遲一段時間後計算完成。 Delay方法只延遲可觀察集合的第一個元素。通常這種行爲是正確的,因爲如果你想緩衝每個人,你將不得不緩衝大量的元素,但是由於將調用Autorefesh的時間延遲了200ms,_refreshFinished的輸出延遲了200ms,並且沒有緩衝區開銷。 基本上我想要一個可視化的每一個MaxTime(some_call,1000ms)觸發,然後延遲200ms甚至更好,一些動態值。在這一點上,我甚至沒有真正關心通過這種方式運行的價值觀,儘管未來它可能會發生變化。

I'm開放給任何建議

回答

3

Observable.Generate()工作有許多重載,這將讓你動態調整的時間,其中的下一個項目被建造。

例如

IScheduler schd = Scheduler.TaskPool; 
var timeout = TimeSpan.FromSeconds(1); 
var shortDelay = TimeSpan.FromMilliseconds(200); 
var longerDelay = TimeSpan.FromMilliseconds(500); 
Observable.Generate(schd.Now, 
        time => true, 
        time => schd.Now, 
        time => new object(), // your code here 
        time => schd.Now.Subtract(time) > timeout ? shortDelay : longerDelay , 
        schd); 
+0

你在倒數第二行有一個錯字。它應該是暫停而不是時間。但我真的很喜歡你的解決方案,它很簡單,然後我在下面發佈 – LDomagala

1

這聽起來更像是爲新的異步架構http://msdn.microsoft.com/en-us/vstudio/gg316360

+0

請注意詳細說明爲什麼這將成爲新的異步框架的工作? –

+0

因爲他基本上正在等待一些計算完成,所以聽起來像這樣做會更容易,等到完成一個計算。另外,「等待其他東西趕上」聽起來像是等待多個異步作業完成,這正是異步框架的用途。如果工作沒有在特定的時間內完成,只需投入一些超時並相應地處理它們。 –

1

有一種方法可以做到這一點。它不是最容易的事情,因爲等待時間必須根據每個值進行動態計算,但它的工作原理並且非常通用。

當您使用此代碼時,您可以插入應該在YOURCODE中調用的代碼,其他所有代碼都會自動運行。你的代碼將基本上被稱爲每個最大值(yourCodeTime + extraDelay,usualCallTime + extraDelay)。這意味着你的代碼不會被同時調用兩次,應用程序總是會有額外的時間去做其他的事情。 如果有一些更容易/其他方式來做到這一點,我會聽到它。

double usualCallTime = 1000; 
double extraDealy = 100; 
var subject = new Subject<double>(); 
var subscription = 
    sub.TimeInterval() 
     .Select(x => 
      { 
       var processingTime = x.Interval.TotalMilliseconds - x.Value; 
       double timeToWait = 
        Math.Max(0, usualCallTime - processingTime) + extraDelay; 
       return Observable.Timer(TimeSpan.FromMilliseconds(timeToWait)) 
        .Select(ignore => timeToWait); 
      }) 
     .Switch() 
     .Subscribe(x => {YOURCODE();sub.OnNext(x)}); 
sub.OnNext(0); 

private static void YOURCODE() 
{ 
    // do stuff here 
    action.Invoke(); 
} 
0

如果我正確理解你的問題,你有一個長期磨合的計算功能,例如這樣的:

static String compute() 
{ 
    int t = 300 + new Random().Next(1000); 
    Console.Write("[{0}...", t); 
    Thread.Sleep(t); 
    Console.Write("]"); 
    return Guid.NewGuid().ToString(); 
} 

你想每秒,但沒有重疊的電話至少一次調用這個函數,兩次呼叫之間的恢復時間至少爲200ms。下面的代碼適用於這種情況。

我從一個更實用的方法開始(使用Scan()Timestamp()),更多的是Rx風格 - 因爲我在尋找一個很好的Rx練習 - 但最終,這種非聚合方法更簡單。

static void Main() 
{ 
    TimeSpan period = TimeSpan.FromMilliseconds(1000); 
    TimeSpan recovery = TimeSpan.FromMilliseconds(200); 

    Observable 
     .Repeat(Unit.Default) 
     .Select(_ => 
     { 
      var s = DateTimeOffset.Now; 
      var x = compute(); 
      var delay = period - (DateTimeOffset.Now - s); 
      if (delay < recovery) 
       delay = recovery; 

      Console.Write("+{0} ", (int)delay.TotalMilliseconds); 

      return Observable.Return(x).Delay(delay).First(); 
     }) 
     .Subscribe(Console.WriteLine); 
} 

這裏的輸出:

[1144...]+200 a7cb5d3d-34b9-4d44-95c9-3e363f518e52 
[1183...]+200 359ad966-3be7-4027-8b95-1051e3fb20c2 
[831...]+200 f433b4dc-d075-49fe-9c84-b790274982d9 
[766...]+219 310c9521-7bee-4acc-bbca-81c706a4632a 
[505...]+485 0715abfc-db9b-42e2-9ec7-880d7ff58126 
[1244...]+200 30a3002a-924a-4a64-9669-095152906d85 
[1284...]+200 e5b1cd79-da73-477c-bca0-0870f4b5c640 
[354...]+641 a43c9df5-53e8-4b58-a0df-7561cf4b0483 
[1094...]+200 8f25019c-77a0-4507-b05e-c9ab8b34bcc3 
[993...]+200 840281bd-c8fd-4627-9324-372636f8dea3 

[編輯:此示例使用的Rx 2.0(RC)2.0.20612。0]

0

假設你有一個現成的 '的IObservable',然後下面的工作

var delay = TimeSpan.FromSeconds(1.0); 
var actual = source.Scan(
    new ConcurrentQueue<object>(), 
    (q, i) => 
     { 
      q.Enqueue(i); 
      return q; 
     }).CombineLatest(
      Observable.Interval(delay), 
      (q, t) => 
       { 
        object item; 
        if (q.TryDequeue(out item)) 
        { 
         return item; 
        } 

        return null; 
       }).Where(v => v != null); 

'實際' 是你得到的觀測。但請記住,如果上面的代碼已經不熱了,那麼它就變成了一個Hot observable。所以你不會得到'OnCompleted'調用。