2013-09-22 16 views
3

在我一直在努力研究的下列課程的CreateRegistryObservable方法中,我無法弄清楚下列行分配的IDisposable可以乾淨地處理。問題在於,我在ActionAction內引用了它,它指定onComplete委託,它用{null}初始化來捕獲聲明的數組變量,並且在最後調用Schedule()時返回的內容未被反映出來這個Action的調用。我該如何處理調度程序乾淨?如何在我的代碼中處理IDisposable?

scheduler[0] = Scheduler.CurrentThread.Schedule(iterator); 

public class RegistryKeyMonitor<T> : IObservable<T> 
{ 
    private readonly RegistryKey _registryKey; 
    private readonly string _name; 
    private readonly TimeSpan _timeSpan; 
    private readonly ILog _logger = LogManager.GetLogger("RegistryKeyMonitor"); 

    public RegistryKeyMonitor(RegistryKey registryKey, string name, TimeSpan timeSpan) 
    { 
     _registryKey = registryKey; 
     _name = name; 
     _timeSpan = timeSpan; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     var sub =CreateRegistryObservable(_registryKey, _name, _timeSpan) 
      //.Catch(Observable.Empty<object>()) 
      .Where(obj => obj != null) 
      .Cast<T>(); 
     return sub.Subscribe(observer); 

    } 

    private IObservable<object> CreateValueChangeObservable(RegistryKey registryKey, string name, object initialValue, 
     TimeSpan timeSpan, CancellationToken token) 
    { 
     return Observable.Create<object>(o => 
      { 
       var scheduler = Scheduler.Immediate; 

       Action<object, Action<object, TimeSpan>> iterator = (current, self) => 
        { 
         try 
         { 
          if (token.IsCancellationRequested) 
          { 
           o.OnCompleted(); 
           return; 
          } 
          var value = registryKey.GetValue(name); 
          _logger.DebugFormat("{0} == registryKey.GetValue(name)", value); 
          if (current == null && value != null || 
           current != null && !current.Equals(value)) 
          { 
           _logger.DebugFormat("passing data {0}", value); 
           o.OnNext(value); 
          } 
          else 
           self(value, timeSpan); 
         } 
         catch (Exception e) 
         { 
          o.OnError(e); 
         } 
        }; 

       return scheduler.Schedule(initialValue, timeSpan, iterator); 
      }); 
    } 

    public IObservable<object> CreateRegistryObservable(RegistryKey registryKey, string name, TimeSpan timeSpan) 
    { 

     return Observable.Create<object>(o => 
      { 
       var cancel = new CancellationDisposable(); 

       return new CompositeDisposable(cancel, 
        NewThreadScheduler.Default.Schedule(() => 
         { 
          IDisposable[] scheduler = {null}; 
          var currentStateSubscription = new SerialDisposable(); 
          object currentValue = null; 

          Action<Action> iterator = self => 
           currentStateSubscription.Disposable = 
           CreateValueChangeObservable(registryKey, name, currentValue, 
                  timeSpan, cancel.Token) 
            .Subscribe(value => 
            { 
             currentValue = value; 
             self(); 
             o.OnNext(value); 
            }, 
            o.OnError, 
            ()=> { 
              currentStateSubscription.Dispose(); 
              scheduler[0].Dispose(); 
            } 
            ); 
          scheduler[0] = Scheduler.CurrentThread.Schedule(iterator); 
        })); 
      }); 
    } 
} 

回答

0

我剛剛弄明白了我自己,這是以下內容。

public IObservable<object> CreateRegistryObservable(RegistryKey registryKey, string name, TimeSpan timeSpan) 
    { 

     return Observable.Create<object>(o => 
      { 
       var cancel = new CancellationDisposable(); 

       var currentStateSubscription = new SerialDisposable(); 

       object currentValue = null; 

       return new CompositeDisposable(cancel, NewThreadScheduler.Default.Schedule(
        self => currentStateSubscription.Disposable = 
         CreateValueChangeObservable(registryKey, name, currentValue, 
                timeSpan, cancel.Token) 
          .Subscribe(value => 
          { 
           currentValue = value; 
           self(); 
           o.OnNext(value); 
          }, 
          o.OnError, 
          currentStateSubscription.Dispose 
          ) 
        )); 
      }); 
    } 
相關問題