2013-12-21 48 views
3

我的要求:如何創建調用方法的Observable Timer並在方法正在運行時阻止取消,直到完成?

  1. 運行方法DoWork在指定的時間間隔。
  2. 如果在DoWork的呼叫之間呼叫停止,則只需停止計時器。
  3. 如果在DoWork正在運行時調用stop,則阻塞直到DoWork完成。
  4. 如果DoWork在調用停止後超時完成,超時。

我有一個解決方案,似乎工作到目前爲止,但我不是超級滿意,並認爲我可能會失去一些東西。下面是從我的測試應用程序的無效的主要:

var source = new CancellationTokenSource(); 

// Create an observable sequence for the Cancel event. 
var cancelObservable = Observable.Create<Int64>(o => 
{ 
    source.Token.Register(() => 
    { 
     Console.WriteLine("Start on canceled handler."); 
     o.OnNext(1); 
     Console.WriteLine("End on canceled handler."); 
    }); 

    return Disposable.Empty; 
}); 

var observable = 
    // Create observable timer. 
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default) 
     // Merge with the cancel observable so we have a composite that 
     // generates an event every 10 seconds AND immediately when a cancel is requested. 
     .Merge(cancelObservable) 
     // This is what I ended up doing instead of disposing the timer so that I could wait 
     // for the sequence to finish, including DoWork. 
     .TakeWhile(i => !source.IsCancellationRequested) 
     // I could put this in an observer, but this way exceptions could be caught and handled 
     // or the results of the work could be fed to a subscriber. 
     .Do(l => 
     { 
      Console.WriteLine("Start DoWork."); 
      Thread.Sleep(TimeSpan.FromSeconds(5)); 
      Console.WriteLine("Finish DoWork."); 
     }); 

var published = observable.Publish(); 

var disposable = published.Connect(); 

// Press key between Start DoWork and Finish DoWork to test the cancellation while 
// running DoWork. 
// Press key between Finish DoWork and Start DoWork to test cancellation between 
// events. 
Console.ReadKey(); 

// I doubt this is good practice, but I was finding that o.OnNext was blocking 
// inside of register, and the timeout wouldn't work if I blocked here before 
// I set it up. 
Task.Factory.StartNew(source.Cancel); 

// Is there a preferred way to block until a sequence is finished? My experience 
// is there's a timing issue if Cancel finishes fast enough the sequence may already 
// be finished by the time I get here and .Wait() complains that the sequence contains 
// no elements. 
published.Timeout(TimeSpan.FromSeconds(1)) 
    .ForEach(i => { }); 

disposable.Dispose(); 

Console.WriteLine("All finished! Press any key to continue."); 
Console.ReadKey(); 

回答

4

首先,在你cancelObservable,確保並返回Token.Register結果作爲您的可支配而不是返回Disposable.Empty

下面是轉向CancellationTokens到可觀了良好的擴展方法:

public static IObservable<Unit> AsObservable(this CancellationToken token, IScheduler scheduler) 
{ 
    return Observable.Create<Unit>(observer => 
    { 
     var d1 = new SingleAssignmentDisposable(); 
     return new CompositeDisposable(d1, token.Register(() => 
      { 
       d1.Disposable = scheduler.Schedule(() => 
       { 
        observer.OnNext(Unit.Default); 
        observer.OnCompleted(); 
       }); 
      })); 
    }); 
} 

現在,您的實際要求:

public IObservable<Unit> ScheduleWork(IObservable<Unit> cancelSignal) 
{ 
    // Performs work on an interval 
    // stops the timer (but finishes any work in progress) when the cancelSignal is received 
    var workTimer = Observable 
     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10)) 
     .TakeUntil(cancelSignal) 
     .Select(_ => 
     { 
      DoWork(); 
      return Unit.Default; 
     }) 
     .IgnoreElements(); 

    // starts a timer after cancellation that will eventually throw a timeout exception. 
    var timeoutAfterCancelSignal = cancelSignal 
     .SelectMany(c => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(5))); 

    // Use Amb to listen to both the workTimer 
    // and the timeoutAfterCancelSignal 
    // Since neither produce any data we are really just 
    // listening to see which will complete first. 
    // if the workTimer completes before the timeout 
    // then Amb will complete without error. 
    // However if the timeout expires first, then Amb 
    // will produce an error 
    return Observable.Amb(workTimer, timeoutAfterCancelSignal); 
} 

// Usage 
var cts = new CancellationTokenSource(); 
var s = ScheduleWork(cts.Token.AsObservable(Scheduler.Default)); 

using (var finishedSignal = new ManualResetSlim()) 
{ 
    s.Finally(finishedSignal.Set).Subscribe(
     _ => { /* will never be called */}, 
     error => { /* handle error */ }, 
     () => { /* canceled without error */ }); 

    Console.ReadKey(); 
    cts.Cancel(); 

    finishedSignal.Wait(); 
} 

注意,而不是取消標記,你也可以這樣做:

var cancelSignal = new AsyncSubject<Unit>(); 
var s = ScheduleWork(cancelSignal); 

// .. to cancel .. 
Console.ReadKey(); 
cancelSignal.OnNext(Unit.Default); 
cancelSignal.OnCompleted(); 
+0

感謝布蘭登!我不知道爲什麼使用TakeUntil打斷計時器的概念沒有出現在我身上,這完全有道理。使用Amb聽兩個可觀察對象也很酷。您的示例與AsyncSubject一起工作良好,但是當我將Thread.Sleep添加到DoWork以模擬工作時,CancellationToken示例在超時時未按預期方式運行,則直到DoWork完成後纔會調用取消阻止。當我將TakeUntil(cancelSignal)更改爲TakeUntil(cancelSignal.ObserveOn(Scheduler.Default))時,一切都很順利。我將相應地編輯您的示例,如果錯誤請撤消。 – MichaC

+1

而不是在你的'TakeUntil'調用中嵌入'ObserveOn',通過修改'ToObservable'幫助器方法來接受一個'IScheduler'來保持關注點分離和更多的'Rx-y'令牌被取消。我已經修改了答案。 – Brandon

相關問題