2016-02-24 16 views
5

W mojej aplikacji WPF przy użyciu .Net 4.6 Mam zdarzenie, które uruchamia nowe punkty danych z wysoką szybkością (kilkaset na sekundę), ale nie cały czas. Te dane są wyświetlane na wykresie.Bufor Rx bez pustych połączeń z subskrybentem

Chciałbym aktualizować wykres co 50 ms, a nie po każdym nowym punkcie danych.
Aby to osiągnąć wykorzystałem Buffer(TimeSpan.FromMilliseconds(50)) z Rx, który teoretycznie działa dobrze. ALE mój abonent jest również wywoływany co 50 ms, jeśli nie są tworzone żadne nowe punkty danych, co nie jest dokładnie tym, czego chcę.

stworzyłem małą przykładową aplikację, aby sprawdzić, czy na zewnątrz:

using System; 
using System.Reactive.Linq; 

namespace RxTester 
{ 
    public class Program 
    { 
     private static event EventHandler TheEvent; 

     static void Main(string[] args) 
     { 
      var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h); 
      var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000)) 
       .Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received...")); 

      var random = new Random(); 
      var timer = new System.Timers.Timer(2000) 
       { 
        AutoReset = true, 
        Enabled = true 
       }; 
      timer.Elapsed += (s, e) => 
       { 
        var amount = random.Next(1, 10); 
        for (int i = 0; i < amount; ++i) 
         TheEvent?.Invoke(null, null); 
       }; 

      Console.ReadLine(); 

      timer.Enabled = false; 
      subscriber.Dispose(); 
     } 
    } 
} 

Trzeba dodać "Rx-Linq" pakiet Nuget na to, aby uruchomić lub kliknąć na poniższy Fiddle: https://dotnetfiddle.net/TV5tD4

There widzisz kilka "otrzymanych 0 elementów", których chciałbym uniknąć. Wiem, że mogłem prosty check dla e.Count == 0, ale ponieważ używam wielu takich buforów, nie wydaje mi się to optymalne.

Czy istnieje sposób tworzenia nowych buforowanych bloków elementów, jeśli elementy są dostępne? Jestem również otwarty na inne podejścia do rozwiązania mojego problemu związanego z dozowaniem zdarzeń na podstawie czasu - już sprawdziłem TPL Dataflows BatchBlock, ale wydaje się, że obsługuje to wyłącznie rozmiary bloków opartych na liczbie.

Odpowiedz

3

Po raz kolejny możemy użyć potężnego GroupByUntil metody tworzenia tego rozszerzenia

public static IObservable<IList<TSource>> BufferWhenAvailable<TSource> 
              (this IObservable<TSource> source, 
              TimeSpan threshold) 
{ 
    return source.Publish(sp => 
        sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold)) 
         .SelectMany(i => i.ToList())); 

} 
+0

Dzięki za to - wydaje się działać dobrze. BUT 'return source.GroupByUntil (_ => true, _ => Observable.Timer (próg)). SelectMany (g => g.ToList());' wydaje się również działać poprawnie - jaka jest korzyść z ' Observable.Create "wokół niego? – ChrFin

+0

@ChrFin w tym przypadku nie ma żadnej korzyści. Będę edytować odpowiedź :) – supertopi

+0

... a jeśli pomyślisz o tym, 'Observable.FromEvent' jest zawsze gorący. Więc 'Publish' również może zostać pominięty. – supertopi

1

standardowym sposobem postępowania jest po prostu

.Buffer(period) 
.Where(buffer=>buffer.Any()) 

Tak skutecznie robić to, co chcesz uniknąć (count==0). Jednak ten test jest bardzo tani i wyobrażam sobie, że jeśli jest dużo tańszy niż inne koszty, np. Planowanie. Jedynym problemem może być kwota alokacji, które się dzieje (co 50ms tworząc List<T>), a następnie zbliżające się ciśnienie GC Gen0, które może budować.

+0

Dziękuję za twoje zgłoszenie, ale "jedyną obawą może być kwota alokacji, która się dzieje", dlatego chcę tego uniknąć, ponieważ mogę mieć wiele z tych buforów jednocześnie ... – ChrFin

+2

Słyszę Cię. Wygląda na to, że dobrze byłoby dodać do skrzynki narzędziowej. Jeśli wymyślę soln do tego, skontaktuję się z Tobą https://github.com/LeeCampbell/RxCookbook/issues/27 –

+0

@LeeCampbell, jeśli rozumiem poprawnie: nawet jeśli zapobiegniesz "Buffer (TimeSpan) bufory, rozwiązanie jest wciąż inne. Bufory są "uruchamiane" na podstawie licznika czasu, a nie kiedy pojawia się nowa wartość. – supertopi