2013-08-03 52 views
5

我寫了一些代碼,將FileSystemWatcher的事件變成可觀察的序列。這個Reactive Extensions代碼是否會泄漏內存?

我的目標是兩次拆分所有文件系統的變化,以分離流並限制它們。

例如,如果我有10個不同的文件在半秒內變化3次,我只會爲每個文件得到一次通知。

雖然我擔心的是GroupBy()運營商。爲了這個工作,(我認爲)它需要隨着時間的推移不斷建立組,並消耗少量的內存。

這會造成「泄漏」,如果是這樣,我該如何防止它?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") { 
    EnableRaisingEvents = true, 
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size 
}; 

void Main() 
{ 
    var fileSystemEventStream = 
     Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> 
      (
       _ => _watcher.Changed += _, 
       _ => _watcher.Changed -= _ 
      ) 
      .ObserveOn(ThreadPoolScheduler.Instance) 
      .SubscribeOn(ThreadPoolScheduler.Instance) 
      .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath) 
      ; 

    var res = 
     from fileGroup in fileSystemEventStream 
     from file in fileGroup.Throttle(TimeSpan.FromSeconds(1)) 
     select file; 

    res.Subscribe(
     ReceiveFsFullPath, 
     exception => { 
      Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace); 
     }); 

    Console.Read(); 
} 

void ReceiveFsFullPath(string s){ 
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId); 
    Console.WriteLine(s); 
} 
+0

如果你相信c#garnage收集器,而你不應該這樣做,那就不會有泄漏 –

+0

但是這個組織持續引用同一個文件。我明白C#是垃圾收集,但這並不意味着內存泄漏不會發生。 –

+0

您是否嘗試過添加[SciTech的Memory Profiler](http://memprofiler.com/)等探查器來查看您是否在泄漏? –

回答

3

是的,對於每個新密鑰,GroupBy創建一個Subject,並維護這些Subject的字典。你正在訂閱每一個這些。所以這是一小部分內存,隨着時間的推移,這些內存將不斷增加,無法發佈舊條目。當油門計時器到期時,您真正需要的是將鑰匙移除。我想不出與內置運營商做到這一點的方法。所以你需要一個自定義操作符。這是一個刺傷。

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay) 
{ 
    return Observable.Create(observer => 
    { 
     var notifications = new Subject<IObservable<T>>(); 
     var subscription = notifications.Merge().Subscribe(observer); 
     var d = new Dictionary<T, IObserver<T>>(); 
     var gate = new object(); 
     var sourceSubscription = new SingleAssignmentDisposable(); 
     var subscriptions = new CompositeDisposable(subscription, sourceSubscription); 
     sourceSubscription.Disposable = source.Subscribe(value => 
     { 
      IObserver<T> entry; 
      lock(gate) 
      { 
      if (d.TryGetValue(value, out entry)) 
      { 
       entry.OnNext(value); 
      } 
      else 
      { 
       var s = new Subject<T>(); 
       var o = s.Throttle(delay).FirstAsync().Do(() => 
       { 
       lock(gate) 
       { 
        d.Remove(value); 
       } 
       }); 
       notifications.OnNext(o); 
       d.Add(value, s); 
       s.OnNext(value); 
      } 
      } 
     }, observer.OnError, notifications.OnCompleted); 

     return subscriptions; 
    }); 
} 

... 
Observable.FromEventPattern(...) 
    .Select(e => e.EventArgs.FullPath) 
    .ThrottleDistinct(TimeSpan.FromSeconds(1)) 
    .Subscribe(...); 
+1

欣賞你花時間寫作答案的時間。我在MSDN論壇上看到了這個線程。 http://social.msdn.microsoft.com/Forums/en-US/53a66a85-4ab3-4fe9-96c6-8b72cc034d0e/groupbyuntil-usage可能有一個到期組的大小最大爲1的答案是?如果有內置機制,我想避免使用鎖編寫代碼:) –

+0

是的。我不知道GroupByUntil!它的API有點尷尬,但這應該工作:'filenames.GroupByUntil(f => f,g => g.Throttle(delay))。SelectMany(g => g.LastAsync()); – Brandon

1

根據布蘭登的回覆,主題將會增長並且無法被回收*。我在這裏泄漏的內存的主要關注點是你不捕獲訂閱!即

res.Subscribe(... 

必須

subscription = res.Subscribe(... 

更換,如果你不抓住訂閱,你永遠無法處置的認購,從而你永遠不放開的事件處理程序,因此你可以「泄漏內存」。顯然這是沒用的,如果你不在某處實際處理訂閱。

*那麼,如果他們完成了,那麼他們會自動處置,所以這將工作。當FileDeleted事件發生時您可以查看完成序列嗎?

+0

我正在捕獲訂閱,我只是沒有添加這些代碼。我基本上已經將查詢從我們的代碼庫中提取出來,並使LINQPad變得友好。即使如此,如果這個查詢運行的應用程序的生命和訂閱沒有結束,那麼我們有一個「泄漏」:) –

+0

感謝您的澄清。 GroupByUntil似乎是你正在尋找的東西(儘管你仍然沒有解釋你想在什麼條件下發布序列) –

+1

根據定義的問題,你可以釋放序列,只要它的空閒時間長於節流時間。在原始版本的代碼和使用「GroupByUntil」的版本之間的輸出序列中沒有「可見的」區別,除非後者在空閒時釋放資源。 – Brandon

相關問題