2012-04-02 19 views
7

Niestety, gdy tytuł nie jest bardzo jasne, nie mogłam myśleć o niczym lepszym ...W języku Rx, jak grupować najnowsze elementy po pewnym czasie?

ja odbierający dane wprowadzone przez użytkownika w postaci IObservable<char>, i chciałbym, aby przekształcić go do IObservable<char[]>, grupując znaki za każdym razem, gdy użytkownik przestaje pisać przez ponad 1 sekundę. Tak więc, na przykład, jeśli wejście jest w następujący sposób:

h 
e 
l 
l 
o 
(pause) 
w 
o 
r 
l 
d 
(pause) 
! 
(pause) 

Chciałbym wyjście zaobserwowania być:

['h', 'e', 'l', 'l', 'o'] 
['w', 'o', 'r', 'l', 'd'] 
['!'] 

Podejrzewam, że rozwiązanie jest dość proste, ale nie mogę znaleźć właściwe podejście ... Próbowałem użyć Buffer, GroupByUntil, Throttle i kilku innych, bezskutecznie.

Każdy pomysł byłby mile widziany!


EDIT: Mam coś, co prawie działa:

 _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1))) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

Ale muszę opóźnienie należy zresetować za każdym razem nowy znak jest wpisany ...

Odpowiedz

7

Buffer i Throttle byłoby mało, jeśli źródłem jest gorący. Aby było gorąco, możesz użyć .Publish().RefCount(), aby upewnić się, że skończysz z jedną subskrypcją do źródła.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source, 
               TimeSpan dueTime) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    //defer dueTime checking to Throttle 
    var hot = source.Publish().RefCount(); 
    return hot.Buffer(() => hot.Throttle(dueTime)); 
} 
+0

Dzięki, działa świetnie i jest znacznie bardziej elegancko niż moje rozwiązanie. Właściwie to moje źródło jest już gorące (jest to "Temat ", który zasilam ze zdarzeń wejściowych); Nie jestem pewien jaki wpływ na użycie 'Publish(). RefCount()' jest ... –

+0

@ThomasLevesque Jeśli twoje źródło jest już gorąca, wierzę, że Publish/RefCount będzie po prostu zmarnowanymi warstwami otoki. Jeśli chcesz użyć tego jako funkcji ogólnego przypadku, prawdopodobnie po prostu zostawiłbym je, chyba że są one wyświetlane jako problem w twojej aplikacji.Jeśli użyjesz go tylko raz, zmień parametr na 'hotSource' i zostaw notatkę w komentarzach do dokumentu i powinieneś bezpiecznie usunąć Publish/RefCount. –

0

OK, znalazł rozwiązanie:

 Func<IObservable<char>> bufferClosingSelector = 
      () => 
      _input.Timeout(TimeSpan.FromSeconds(1)) 
        .Catch(Observable.Return('\0')) 
        .Where(i => i == '\0'); 
     _input.Buffer(bufferClosingSelector) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

Zasadniczo bufferClosingSelector pcha coś, gdy nastąpi przekroczenie limitu czasu, whi ch zamyka bieżący bufor. Prawdopodobnie jest to prostszy i bardziej elegancki sposób, ale działa ... Jestem otwarty na lepsze sugestie;)

0

Napisałem przedłużenie jakiś czas temu, aby zrobić to, co chcesz - BufferWithInactivity.

Oto ona:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

Dzięki! Jednak nie jest to "prostsze" rozwiązanie niż moje;) –

0

To powinno działać. To nie jest tak zwięzłe jak twoje rozwiązanie, ponieważ implementuje logikę poprzez klasę zamiast metod rozszerzania, ale może to być lepszy sposób na zrobienie tego. W skrócie: za każdym razem, gdy dostajesz char, dodaj go do List i (ponownie) uruchom zegar, który wygaśnie w ciągu jednej sekundy; kiedy upłynie czas, powiadom naszych subskrybentów za pomocą tablicy List i zresetuj stan, aby był gotowy na następny raz.

class Breaker : IObservable<char[]>, IObserver<char> 
    { 
     List<IObserver<char[]>> observers = new List<IObserver<char[]>>(); 
     List<char> currentChars; 
     DispatcherTimer t; 
     public Breaker(IObservable<char> source) 
     { 
      source.Subscribe(this); 
      t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) }; 
      t.Tick += TimerOver; 
      currentChars = new List<char>(); 
     } 
     public IDisposable Subscribe(IObserver<char[]> observer) 
     { 
      observers.Add(observer); 
      return null; //TODO return a useful IDisposable 
     } 
     public void OnCompleted() 
     { 
      //TODO implement completion logic 
     } 
     public void OnError(Exception e) 
     { 
      //TODO implement error logic 
     } 
     public void OnNext(char value) 
     { 
      currentChars.Add(value); 
      t.Start(); 
     } 
     void TimerOver(object sender, EventArgs e) 
     { 
      char[] chars = currentChars.ToArray(); 
      foreach (var obs in observers) 
       obs.OnNext(chars); 
      currentChars.Clear(); 
      t.Stop(); 
     } 
    } 
Powiązane problemy