2011-12-21 25 views

回答

1

我會將observable包裹在一個observable中,該observable保證每分鐘至少返回一次值。包裝可以通過運行一個定時器來實現這一點,只要包裝的observable返回一個值就會重新啓動定時器。

因此,只要包裝的observable返回數據或在最後一個事件之後經過了一分鐘,包裝器就會返回數據。

應用程序的其餘部分方便地只是觀察包裝。

0

這裏有一個(非線程安全的,如果你的來源是多線程)實施RepeatAfterTimeout運營商:

編輯更新的超時如我所料

// Repeats the last value emitted after a timeout 
public static IObservable<TSource> RepeatAfterTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler) 
{ 
    return Observable.CreateWithDisposable<TSource>(observer => 
    { 
     var timer = new MutableDisposable(); 
     var subscription = new MutableDisposable(); 

     bool hasValue = false; 
     TSource lastValue = default(TSource); 

     timer.Disposable = scheduler.Schedule(recurse => 
     { 
      if (hasValue) 
      { 
       observer.OnNext(lastValue); 
      } 

      recurse(); 
     }); 

     subscription.Disposable = source 
      .Do(value => { lastValue = value; hasValue = true; }) 
      .Subscribe(observer); 

     return new CompositeDisposable(timer, subscription); 
    }); 
} 

public static IObservable<TSource> RepeatAfterTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout) 
{ 
    return source.RepeatAfterTimeout(timeout, Scheduler.TaskPool); 
} 
+0

我不認爲這將符合要求(即使您實際上在某處使用了超時)。您通過計時器每個週期輸出一個值,而不管來自源的活動。此外,您正在使用過時的版本或Rx。 – 2011-12-21 17:18:09

0

我有沒有工作完全相同的要求一次。如果在第一次超時之前沒有值觸發,我選擇使用默認值。下面是C#版本:

public static IObservable<T> 
AtLeastEvery<T>(this IObservable<T> source, TimeSpan timeout, 
       T defaultValue, IScheduler scheduler) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    if (scheduler == null) throw new ArgumentNullException("scheduler"); 
    return Observable.Create<T>(obs => 
     { 
      ulong id = 0; 
      var gate = new Object(); 
      var timer = new SerialDisposable(); 
      T lastValue = defaultValue; 

      Action createTimer =() => 
       { 
        ulong startId = id; 
        timer.Disposable = scheduler.Schedule(timeout, 
          self => 
          { 
           bool noChange; 
           lock (gate) 
           { 
            noChange = (id == startId); 
            if (noChange) obs.OnNext(lastValue); 
           } 
           //only restart if no change, otherwise 
           //the change restarted the timeout 
           if (noChange) self(timeout); 
          }); 
       }; 
      //start the first timeout 
      createTimer(); 
      var subscription = source.Subscribe(
       v => 
       { 
        lock (gate) 
        { 
         id += 1; 
         lastValue = v; 
        } 
        obs.OnNext(v); 
        createTimer(); //reset the timeout 
       }, 
       ex => 
       { 
        lock (gate) 
        { 
         id += 1; //'cancel' timeout 
        } 
        obs.OnError(ex); 
        //do not reset the timeout, because the sequence has ended 
       }, 
       () => 
       { 
        lock (gate) 
        { 
         id += 1; //'cancel' timeout 
        } 
        obs.OnCompleted(); 
        //do not reset the timeout, because the sequence has ended 
       }); 

      return new CompositeDisposable(timer, subscription); 
     }); 
} 

如果你不希望有每次通過調度,只是使挑選一個默認的和代表這種方法的重載。我用Scheduler.ThreadPool

此代碼的工作原理是使用SerialDisposable的行爲在來自源的新值進入時「取消」上一次超時調用。還有一個計數器用於計時器已經過去的情況(在這種情況下,處置從Schedule返回將無濟於事),但該方法尚未實際運行。

我研究了更改超時的可能性,但不需要它們解決我正在處理的問題。我記得這是可能的,但沒有任何代碼方便。

2

這裏是一個班輪,做你想做的。我測試過了,它似乎是正確的。

var results = source 
    .Publish(xs => 
     xs 
      .Select(x => 
       Observable 
        .Interval(TimeSpan.FromMinutes(1.0)) 
        .Select(_ => x) 
        .StartWith(x)) 
      .Switch()); 

讓我知道這是否有訣竅。

0

我認爲這應該工作在Rx方式(沒有遞歸,但仍涉及副作用):

public static IObservable<TSource> RepeatLastValueWhenIdle<TSource>(
     this IObservable<TSource> source, 
     TimeSpan idleTime, 
     TSource defaultValue = default(TSource)) 
    { 
     TSource lastValue = defaultValue; 

     return source 
      // memorize the last value on each new 
      .Do(ev => lastValue = ev) 
      // re-publish the last value on timeout 
      .Timeout(idleTime, Observable.Return(lastValue)) 
      // restart waiting for a new value 
      .Repeat(); 
    } 
相關問題