2013-08-03 14 views
5

Napisałem kod, który zmienia zdarzenie FileSystemWatcher w obserwowalną sekwencję.Czy ten kod Reactive Extensions wycieka z pamięci?

Moim celem jest podzielenie wszystkich zmian w systemie plików w celu oddzielenia strumieni i ich dławienia.

Na przykład, jeśli mam 10 różnych plików, które zmieniają się 3 razy w ciągu pół sekundy, otrzymam notyfikację tylko raz dla każdego pliku.

Co mnie jednak niepokoi, to operator GroupBy(). Aby to zadziałało (zakładam), musiałoby z czasem budować grupę i zużywać niewielkie ilości pamięci.

Czy spowoduje to "wyciek", a jeśli tak, w jaki sposób mogę temu zapobiec?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") { 
    EnableRaisingEvents = true, 
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size 
}; 

void Main() 
{ 
    var fileSystemEventStream = 
     Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> 
      (
       _ => _watcher.Changed += _, 
       _ => _watcher.Changed -= _ 
      ) 
      .ObserveOn(ThreadPoolScheduler.Instance) 
      .SubscribeOn(ThreadPoolScheduler.Instance) 
      .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath) 
      ; 

    var res = 
     from fileGroup in fileSystemEventStream 
     from file in fileGroup.Throttle(TimeSpan.FromSeconds(1)) 
     select file; 

    res.Subscribe(
     ReceiveFsFullPath, 
     exception => { 
      Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace); 
     }); 

    Console.Read(); 
} 

void ReceiveFsFullPath(string s){ 
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId); 
    Console.WriteLine(s); 
} 
+0

Jeśli wierzysz w kolekcjonera C# garnage, a nie powinieneś, nie ma przecieku. –

+0

Ale grupa utrzymuje odniesienie do tego samego pliku w czasie. Rozumiem, że C# jest zbierane śmieci, ale to nie znaczy, że wycieki pamięci się nie zdarzają. –

+0

Czy próbowałeś dołączyć profilera, takiego jak [SciTech's Profiler pamięci] (http://memprofiler.com/), aby sprawdzić, czy nie ma wycieków? –

Odpowiedz

3

Tak, dla każdego nowego klucza GroupBy tworzy temat i utrzymuje słownik tych tematów. I subskrybujesz każdy z nich. Jest to więc niewielka porcja pamięci, która z upływem czasu będzie rosnąć, bez uprzedniego wypuszczenia starych wpisów. To, czego naprawdę potrzebujesz, to usunięcie klucza po upływie czasu zegara przepustnicy. Nie mogę wymyślić, jak to zrobić z wbudowanymi operatorami. Potrzebujesz więc niestandardowego operatora. Oto ukłucie w jednym.

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay) 
{ 
    return Observable.Create(observer => 
    { 
     var notifications = new Subject<IObservable<T>>(); 
     var subscription = notifications.Merge().Subscribe(observer); 
     var d = new Dictionary<T, IObserver<T>>(); 
     var gate = new object(); 
     var sourceSubscription = new SingleAssignmentDisposable(); 
     var subscriptions = new CompositeDisposable(subscription, sourceSubscription); 
     sourceSubscription.Disposable = source.Subscribe(value => 
     { 
      IObserver<T> entry; 
      lock(gate) 
      { 
      if (d.TryGetValue(value, out entry)) 
      { 
       entry.OnNext(value); 
      } 
      else 
      { 
       var s = new Subject<T>(); 
       var o = s.Throttle(delay).FirstAsync().Do(() => 
       { 
       lock(gate) 
       { 
        d.Remove(value); 
       } 
       }); 
       notifications.OnNext(o); 
       d.Add(value, s); 
       s.OnNext(value); 
      } 
      } 
     }, observer.OnError, notifications.OnCompleted); 

     return subscriptions; 
    }); 
} 

... 
Observable.FromEventPattern(...) 
    .Select(e => e.EventArgs.FullPath) 
    .ThrottleDistinct(TimeSpan.FromSeconds(1)) 
    .Subscribe(...); 
+1

Doceń czas poświęcony na pisanie odpowiedzi. Widziałem ten wątek na forum MSDN. http://social.msdn.microsoft.com/Forums/en-US/53a66a85-4ab3-4fe9-96c6-8b72cc034d0e/groupbyuntil-usage Czy posiadanie wygasającej grupy o maksymalnej wielkości grupy może być odpowiedzią? Chciałbym uniknąć pisania kodu z blokadami, jeśli jest wbudowany mechanizm :) –

+0

Tak. Nie wiedziałem o GroupByUntil! To API jest trochę niezręczne, ale to powinno działać: 'file.GroupByUntil (f => f, g => g.Throttle (delay)). – Brandon

1

Zgodnie z odpowiedzią Brandona tematy będą rosły i nie będą mogły zostać odzyskane *. Moją główną obawą związaną z przeciekającą pamięcią jest to, że nie przechwytujesz subskrypcji! tj

res.Subscribe(... 

należy zastąpić

subscription = res.Subscribe(... 

jeśli nie uchwycić subskrypcję, nigdy nie można wyrzucać z abonamentu, a tym samym nie puścisz obsługi zdarzeń, co masz „wyciekły pamięć ". Oczywiście to nie ma sensu, jeśli w rzeczywistości nie dysponujesz abonamentem.

* Cóż, jeśli skończą, zostaną automatycznie unieszkodliwione, tak by działały. Po zakończeniu zdarzenia FileDeleted możesz wykonać sekwencję?

+0

Przechwytywam subskrypcję, po prostu nie dodałem tego fragmentu kodu. Zasadniczo wziąłem zapytanie z naszej bazy kodu i sprawiłem, że LINQPad był przyjazny, to wszystko. Mimo to, jeśli zapytanie trwa przez cały czas trwania aplikacji, a subskrypcja się nie kończy, mamy "wyciek" :) –

+0

Dzięki za wyjaśnienia. GroupByDo tego, co wydaje się być tym, czego szukasz (nawet jeśli nadal musisz wyjaśnić, w jakich warunkach chcesz zwolnić sekwencje) –

+1

Z definicji podanego problemu możesz zwolnić sekwencję, gdy jest ona bezczynna dłużej niż okres dławienia. Nie ma "widocznej" różnicy w sekwencji wyjściowej między oryginalną wersją kodu a wersją używającą 'GroupByUntil', z wyjątkiem tego, że uwolni zasoby, gdy stan jałowy. – Brandon