2015-07-11 11 views
5

Mam klasę, która bierze strumień wydarzeń i wypycha inny strumień wydarzeń.Z rozszerzeniami reaktywnymi (RX), czy możliwe jest dodanie polecenia "Przerwa"?

Wszystkie zdarzenia używają rozszerzeń reaktywnych (RX). Przychodzący strumień zdarzeń jest wypychany z zewnętrznego źródła do postaci IObserver<T> przy użyciu .OnNext, a wychodzący strumień zdarzeń jest wypychany przy użyciu IObservable<T> i .Subscribe. Używam Subject<T> do zarządzania tym, za kulisami.

Zastanawiam się, jakie techniki istnieją w RX, aby tymczasowo wstrzymać wyjście. Oznaczałoby to, że zdarzenia przychodzące narastałyby w wewnętrznej kolejce, a po ich wstrzymaniu zdarzenia miałyby być ponownie wyprowadzane.

+0

Myśląc, że jeśli wyjście jest wstrzymane, można przekierować wydarzenia w kolejce wewnętrznej, a gdy outout jest wstrzymane, to może opróżnić kolejkę na zewnątrz. – Contango

+0

Nie zaimplementowałeś własnego 'IObserver ', prawda? – Enigmativity

+0

Nie, wszystko, co zrobiłem, to wrzucenie wewnętrznego 'Subject ' do 'IObserver ', aby można było ujawnić metodę '.OnNext'. – Contango

Odpowiedz

1

Możesz symulować wstrzymywanie/anulowanie z Observable.

Gdy pauseObservable wyemituje "wstrzymaną" wartość, zdarzenia bufora do momentu, aż pauseObservable wyemituje wartość "wstrzymaną".

Oto przykład, który korzysta BufferUntil implementation by Dave Sexton i Observable logic by Timothy Shields (z własnego pytanie jakiś czas temu)

 //Input events, hot observable 
     var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(i => i.ToString()) 
      .Publish().RefCount(); 

     //Simulate pausing from Keyboard, not actually relevant within this answer 
     var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
      k => KeyPressed += k, k => KeyPressed -= k) 
      .Select(i => i.EventArgs.PressedKey) 
      .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause 
      .DistinctUntilChanged(); 

     //Let events through when not paused 
     var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused}) 
      .Where(i => !i.paused) //not paused 
      .Select(i => i.value) 
      .Subscribe(Console.WriteLine); 

     //When paused, buffer until not paused 
     var pausedEvents = pauseObservable.Where(i => i) 
      .Subscribe(_ => 
       source.BufferUntil(pauseObservable.Where(i => !i)) 
        .Select(i => String.Join(Environment.NewLine, i)) 
        .Subscribe(Console.WriteLine)); 

Pokój dla poprawy: może scalić dwie subskrypcje do źródła (pausedEvents i notPausedEvents) jako jeden.

2

Oto rozsądnie prosty sposób Rx, aby zrobić to, co chcesz. Stworzyłem metodę rozszerzenia o nazwie Pausable, która ma źródło obserwowalne, a druga obserwowalną z boolean, która zatrzymuje lub wznawia obserwowalne.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source, 
    IObservable<bool> pauser) 
{ 
    return Observable.Create<T>(o => 
    { 
     var paused = new SerialDisposable(); 
     var subscription = Observable.Publish(source, ps => 
     { 
      var values = new ReplaySubject<T>(); 
      Func<bool, IObservable<T>> switcher = b => 
      { 
       if (b) 
       { 
        values.Dispose(); 
        values = new ReplaySubject<T>(); 
        paused.Disposable = ps.Subscribe(values); 
        return Observable.Empty<T>(); 
       } 
       else 
       { 
        return values.Concat(ps); 
       } 
      }; 

      return pauser.StartWith(false).DistinctUntilChanged() 
       .Select(p => switcher(p)) 
       .Switch(); 
     }).Subscribe(o); 
     return new CompositeDisposable(subscription, paused); 
    }); 
} 

Może być stosowany tak:

var xs = Observable.Generate(
    0, 
    x => x < 100, 
    x => x + 1, 
    x => x, 
    x => TimeSpan.FromSeconds(0.1)); 

var bs = new Subject<bool>(); 

var pxs = xs.Pausable(bs); 

pxs.Subscribe(x => { /* Do stuff */ }); 

Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 
Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 

Teraz jedyną rzeczą, nie mogłem się zorientować, co masz na myśli przez „przychodzącego strumienia zdarzeń jest IObserver<T>”. Strumienie to IObservable<T>. Obserwatorzy nie są strumieniami. Wygląda na to, że czegoś tu nie robisz. Czy możesz dodać swoje pytanie i wyjaśnić dalej?

+0

Dzięki za odpowiedź. Zaktualizowałem moje pytanie, aby przepływ danych był jaśniejszy. – Contango

+0

Jak widać, to najwyraźniej nie przepuszcza pierwszych wartości, ponieważ przechodzi przez fałszywą gałąź i chce połączyć "wartości" (co jest faktycznie "Observable.Nevever", jak sądzę?). Zhakowałem go na kształt poprzez zainicjowanie wartości 'null 'po raz pierwszy i sprawdzanie w obu gałęziach. Nie jestem pewien, czy jest coś bardziej eleganckiego. – Benjol

+0

Dalsze ostrzeżenie dla przyszłości. 'ReplaySubject' jest odrodzeniem diabła. Jeśli go nie ograniczysz (przez rozmiar bufora lub czas), pozostanie on na wszystko, co zobaczy, na wypadek, gdyby ktoś inny przyłączył się do subskrybowania. – Benjol

4

Oto moje rozwiązanie z użyciem operatorów buforze i okno:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser) 
{ 
    var queue = source.Buffer(pauser.Where(toPause => !toPause), 
           _ => pauser.Where(toPause => toPause)) 
         .SelectMany(l => l.ToObservable()); 

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
         _ => pauser.Where(toPause => !toPause)) 
       .Switch() 
       .Merge(queue); 
} 

okno jest otwarte na subskrypcję i za każdym razem „prawdziwy” jest odbierane ze strumienia Pauser. Zamyka się, gdy pauser dostarcza "fałszywą" wartość.

Bufor robi to, co powinien, buforuje wartości pomiędzy "false" i "true" z pauser. Gdy Buffer otrzyma "true", wyprowadza IList wartości, które są natychmiastowo przesyłane strumieniowo na raz.

DotNetFiddle Link: https://dotnetfiddle.net/vGU5dJ

+0

Prawdopodobnie musisz wykonać 'pauser.Publish (ps => {...})' i zamienić 'pauser' na' ps' w swoim kodzie, w przeciwnym razie tworzysz cztery subskrypcje 'pauser' i zależą od źródła 'pauser', który może spowodować awarię metody. – Enigmativity

+0

Tak, właśnie potwierdziłem. Tworzysz wiele subskrypcji. – Enigmativity

+0

Odnowiłem twój kod na https://dotnetfiddle.net/qWN72d – Enigmativity

Powiązane problemy