2010-07-09 18 views
25

Chcę skutecznie ograniczać strumień zdarzeń, aby mój uczestnik został wywołany po odebraniu pierwszego zdarzenia, ale nie przez 1 sekundę, jeśli zostaną odebrane kolejne zdarzenia. Po upływie tego limitu czasu (1 sekunda), jeśli zostało odebrane kolejne wydarzenie, chcę, aby został wezwany mój pełnomocnik.Jak ograniczać strumień zdarzeń za pomocą RX?

Czy istnieje prosty sposób na wykonanie tego przy użyciu rozszerzeń reaktywnych?

Przykładowy kod:

static void Main(string[] args) 
{ 
    Console.WriteLine("Running..."); 

    var generator = Observable 
     .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
     .Timestamp(); 

    var builder = new StringBuilder(); 

    generator 
     .Sample(TimeSpan.FromSeconds(1)) 
     .Finally(() => Console.WriteLine(builder.ToString())) 
     .Subscribe(feed => 
        builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}", 
                feed.Value, 
                feed.Timestamp.ToString("mm:ss.fff"), 
                DateTime.Now.ToString("mm:ss.fff")))); 

    Console.ReadKey(); 
} 

Prąd wyjściowy:

Running... 
Observed 064, generated at 41:43.602, observed at 41:43.602 
Observed 100, generated at 41:44.165, observed at 41:44.602 

Ale chcę obserwować (znaczniki czasu oczywiście zmieni)

Running... 
Observed 001, generated at 41:43.602, observed at 41:43.602 
.... 
Observed 100, generated at 41:44.165, observed at 41:44.602 
+14

To tylko chłodny rachunek lambda 'x => x <= 100';) – Oliver

Odpowiedz

11

Oto co mam z pewną pomocą Forum RX:

Chodzi o to, aby wydać serię „bilety” dla oryginalna sekwencja na ogień. Te "bilety" są opóźnione z powodu przekroczenia limitu czasu, z wyłączeniem pierwszego, który jest natychmiastowy do sekwencji zgłoszeń. Kiedy nadchodzi wydarzenie i czeka bilet, wydarzenie natychmiast się odpala, w przeciwnym razie czeka na bilet, a następnie strzela. Po uruchomieniu, wydany zostaje następny bilet, i tak dalej ...

Aby połączyć bilety i oryginalne wydarzenia, potrzebujemy kombinatora. Niestety, "standard" .CombineLatest nie może być tutaj użyty, ponieważ byłby to atak na bilety i wydarzenia, które były używane wcześniej. Musiałem więc stworzyć własny kombinator, który jest w zasadzie filtrowanym .CombineLatest, który strzela tylko wtedy, gdy oba elementy w kombinacji są "świeże" - nigdy wcześniej nie zostały zwrócone. Ja nazywam to .CombineVeryLatest aka .BrokenZip;)

Korzystanie .CombineVeryLatest powyższy pomysł może zostać zrealizowany jako takie:

public static IObservable<T> SampleResponsive<T>(
     this IObservable<T> source, TimeSpan delay) 
    { 
     return source.Publish(src => 
     { 
      var fire = new Subject<T>(); 

      var whenCanFire = fire 
       .Select(u => new Unit()) 
       .Delay(delay) 
       .StartWith(new Unit()); 

      var subscription = src 
       .CombineVeryLatest(whenCanFire, (x, flag) => x) 
       .Subscribe(fire); 

      return fire.Finally(subscription.Dispose); 
     }); 
    } 

    public static IObservable<TResult> CombineVeryLatest 
     <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, 
     IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) 
    { 
     var ls = leftSource.Select(x => new Used<TLeft>(x)); 
     var rs = rightSource.Select(x => new Used<TRight>(x)); 
     var cmb = ls.CombineLatest(rs, (x, y) => new { x, y }); 
     var fltCmb = cmb 
      .Where(a => !(a.x.IsUsed || a.y.IsUsed)) 
      .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; }); 
     return fltCmb.Select(a => selector(a.x.Value, a.y.Value)); 
    } 

    private class Used<T> 
    { 
     internal T Value { get; private set; } 
     internal bool IsUsed { get; set; } 

     internal Used(T value) 
     { 
      Value = value; 
     } 
    } 

Edit: oto kolejny bardziej zwarta odmiana CombineVeryLatest proponowany przez Andreas Kopf na forum :

public static IObservable<TResult> CombineVeryLatest 
    <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) 
{ 
    return Observable.Defer(() => 
    { 
     int l = -1, r = -1; 
     return Observable.CombineLatest(
      leftSource.Select(Tuple.Create<TLeft, int>), 
      rightSource.Select(Tuple.Create<TRight, int>), 
       (x, y) => new { x, y }) 
      .Where(t => t.x.Item2 != l && t.y.Item2 != r) 
      .Do(t => { l = t.x.Item2; r = t.y.Item2; }) 
      .Select(t => selector(t.x.Item1, t.y.Item1)); 
    }); 
} 
0

Czy próbowałeś metody Throttle przedłużacz?

Od docs:

ignoruje wartości od obserwowanej sekwencji, które są następnie przez inną wartość przed dueTime

To nie jest dla mnie jasne, czy zrobi to, co chcesz, czy nie - w tym przypadku chcesz zignorować następujące wartości, a nie pierwszą wartość ... ale chciałbym oczekiwać, że będzie to, co chcesz,. Wypróbuj :)

EDYTOWANIE: Hmmm ... nie, nie sądzę, że Throttlejest co jest słuszne. Wierzę, że widzę, co chcesz zrobić, ale nie widzę w tym nic, co mogłoby to zrobić. Może jednak coś przeoczyłem. Czy zapytałeś na forum Rx? Równie dobrze może być to, że jeśli nie jest tam teraz, oni chętnie go :) dodać

I podejrzany można zrobić to sprytnie z SkipUntil i SelectMany jakoś ... ale myślę, że powinno być w jej własna metoda.

+0

Dzięki Jon. Dałem mu szansę, ale nie do końca to, czego chcę. W tym przykładzie użycie funkcji przepustnicy powoduje zignorowanie wszystkich zdarzeń oprócz ostatniego. Muszę zareagować na pierwsze zdarzenie (aby zapewnić system reagujący), ale potem opóźnić 1-sekundową częstotliwość próbkowania dla kolejnych zdarzeń. – Alex

+0

@Alex: Tak, to też znalazłem. (Zobacz moją edycję). –

0

To, czego szukasz, to CombineLatest.

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector 
) 

która łączy 2 obiekty obeservables i zwraca wszystkie wartości, gdy selektor (czas) ma wartość.

edit: John ma rację, że nie może być preferowanym rozwiązaniem

+0

Nie rozumiem, po co to jest - co widzisz tutaj jako dwie obserwacje? –

+0

Jednym z nich są generowane zdarzenia, selektor jest obserwowalny. Interwał (TimeSpan.FromSeconds (1)) – cRichter

+0

Czy to nie wygeneruje zdarzenia za każdym razem, gdy * albo * z nich powstanie wartość? –

9

Dobra,

masz 3 scenariusze tutaj:

1) chciałbym dostać jedna wartość strumień zdarzeń co sekundę. oznacza: że jeśli wygeneruje więcej zdarzeń na sekundę, otrzymasz zawsze większy bufor.

observableStream.Throttle(timeSpan) 

2) Chciałbym, aby uzyskać najnowsze zdarzenie, które zostało wyprodukowane przed drugą dzieje środki: Inne wydarzenia zrzucane.

observableStream.Sample(TimeSpan.FromSeconds(1)) 

3) chcesz uzyskać wszystkie zdarzenia, które miały miejsce w ostatniej sekundzie. i że co drugi

observableStream.BufferWithTime(timeSpan) 

4) Aby wybrać to, co dzieje się między drugim ze wszystkimi wartościami, aż drugi minął, a wynik jest zwracany

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent) 
+1

Dang, scenariusz 2 jest dokładnie tym, czego szukam, i nie byłem w stanie znaleźć metody :( – deadlydog

+1

Jeśli ktoś potrzebuje: 'stream. Sample (TimeSpan.FromSeconds (1)) ' – AlexFoxGill

2

Ok, oto jeden rozwiązanie. Nie podoba mi się to szczególnie, ale ... no cóż.

Porady Hat dla Jona za wskazanie mnie w programie SkipWhile, a także cRichtera za BufferWithTime. Dzięki chłopaki.

static void Main(string[] args) 
{ 
    Console.WriteLine("Running..."); 

    var generator = Observable 
     .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
     .Timestamp(); 

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1)); 

    var action = new Action<Timestamped<int>>(
     feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}", 
            feed.Value, 
            feed.Timestamp.ToString("mm:ss.fff"), 
            DateTime.Now.ToString("mm:ss.fff"))); 

    var reactImmediately = true; 
    bufferedAtOneSec.Subscribe(list => 
            { 
             if (list.Count == 0) 
             { 
              reactImmediately = true; 
             } 
             else 
             { 
              action(list.Last()); 
             } 
            }); 
    generator 
     .SkipWhile(item => reactImmediately == false) 
     .Subscribe(feed => 
         { 
          if(reactImmediately) 
          { 
           reactImmediately = false; 
           action(feed); 
          } 
         }); 

    Console.ReadKey(); 
} 
5

To co napisałem jako odpowiedź na to pytanie w Rx forum:

UPDATE: Oto nowa wersja, która nie spowoduje opóźnień przekazywanie zdarzeń podczas zdarzenia z czasem różnica ponad jedną sekundę:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval) 
{ 
    return Observable.CreateWithDisposable<T>(o => 
    { 
     object gate = new Object(); 
     Notification<T> last = null, lastNonTerminal = null; 
     DateTime referenceTime = DateTime.UtcNow - minInterval; 
     var delayedReplay = new MutableDisposable(); 
     return new CompositeDisposable(source.Materialize().Subscribe(x => 
     { 
      lock (gate) 
      { 
       var elapsed = DateTime.UtcNow - referenceTime; 
       if (elapsed >= minInterval && delayedReplay.Disposable == null) 
       { 
        referenceTime = DateTime.UtcNow; 
        x.Accept(o); 
       } 
       else 
       { 
        if (x.Kind == NotificationKind.OnNext) 
         lastNonTerminal = x; 
        last = x; 
        if (delayedReplay.Disposable == null) 
        { 
         delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() => 
         { 
          lock (gate) 
          { 
           referenceTime = DateTime.UtcNow; 
           if (lastNonTerminal != null && lastNonTerminal != last) 
            lastNonTerminal.Accept(o); 
           last.Accept(o); 
           last = lastNonTerminal = null; 
           delayedReplay.Disposable = null; 
          } 
         }, minInterval - elapsed); 
        } 
       } 
      } 
     }), delayedReplay); 
    }); 
} 

To był mój wcześniej try:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
    .Timestamp(); 

source.Publish(o => 
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1))) 
).Run(x => Console.WriteLine(x)); 
+4

Po prostu znalazłem link do tej odpowiedzi w napisach Microsoft Todo UWP :) –

6

męczyłem się z tym samym problemem w nocy, i wierzę, że znalazłem bardziej elegancki (lub przynajmniej krótszy) rozwiązanie:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1)); 
var throttledSource = source.Take(1).Concat(delay).Repeat(); 
0

Zainspirowany przez odpowiedź Bluelings, zapewniam tutaj wersję, która kompiluje się z Reactive Extensions 2.2.5.

Ta konkretna wersja zlicza liczbę próbek, a także podaje ostatnią próbkowaną wartość.Aby to zrobić następujące klasy służy:

class Sample<T> { 

    public Sample(T lastValue, Int32 count) { 
    LastValue = lastValue; 
    Count = count; 
    } 

    public T LastValue { get; private set; } 

    public Int32 Count { get; private set; } 

} 

Oto operator:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) { 
    if (source == null) 
    throw new ArgumentNullException(nameof(source)); 
    return Observable.Create<Sample<T>>(
    observer => { 
     var gate = new Object(); 
     var lastSampleValue = default(T); 
     var lastSampleTime = default(DateTime); 
     var sampleCount = 0; 
     var scheduledTask = new SerialDisposable(); 
     return new CompositeDisposable(
     source.Subscribe(
      value => { 
      lock (gate) { 
       var now = DateTime.UtcNow; 
       var elapsed = now - lastSampleTime; 
       if (elapsed >= interval) { 
       observer.OnNext(new Sample<T>(value, 1)); 
       lastSampleValue = value; 
       lastSampleTime = now; 
       sampleCount = 0; 
       } 
       else { 
       if (scheduledTask.Disposable == null) { 
        scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
        interval - elapsed, 
        () => { 
         lock (gate) { 
         if (sampleCount > 0) { 
          lastSampleTime = DateTime.UtcNow; 
          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
          sampleCount = 0; 
         } 
         scheduledTask.Disposable = null; 
         } 
        } 
       ); 
       } 
       lastSampleValue = value; 
       sampleCount += 1; 
       } 
      } 
      }, 
      error => { 
      if (sampleCount > 0) 
       observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
      observer.OnError(error); 
      }, 
     () => { 
      if (sampleCount > 0) 
       observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
      observer.OnCompleted(); 
      } 
     ), 
     scheduledTask 
    ); 
    } 
); 
} 
Powiązane problemy