2012-06-07 7 views
10

Mam aplikację, która w niektórych punktach podnosi prawie 1000 zdarzeń w tym samym czasie. To, co chciałbym zrobić, to wsadowe zdarzenia do partii 50 elementów i rozpoczynanie ich przetwarzania co 10 sekund. Nie trzeba czekać na zakończenie zadania wsadowego przed rozpoczęciem nowego przetwarzania wsadowego.Rozszerzenia reaktywne: Przetwarzaj zdarzenia w partiach + dodaj opóźnienie między każdą serią

Na przykład:

10:00:00: 10000 new events received 
10:00:00: StartProcessing (events.Take(50)) 
10:00:10: StartProcessing (events.Skip(50).Take(50)) 
10:00:15: StartProcessing (events.Skip(100).Take(50)) 

Jakieś pomysły jak to osiągnąć? Przypuszczam, że Reactive Extensions to droga, ale inne rozwiązania też są akceptowalne.

starałem się rozpocząć tutaj:

 var bufferedItems = eventAsObservable 
      .Buffer(15) 
      .Delay(TimeSpan.FromSeconds(5) 

Ale zauważyłem, że opóźnienie nie działa jak się spodziewałem, a zamiast tego wszystkie partie rozpoczęła jednocześnie, chociaż 5 sekund z opóźnieniem.

Testowałem również metodę Window, ale nie zauważyłem żadnej różnicy w zachowaniu. Przypuszczam TimeSpan w oknie w rzeczywistości oznacza, że ​​„podejmie wszelkie zdarzenia, które dzieje się w ciągu następnych 10 sekund.

 var bufferedItems = eventAsObservable 
      .Window(TimeSpan.FromSeconds(10), 5) 
      .SelectMany(x => x) 
      .Subscribe(DoProcessing); 

Używam Rx-Main 2.0.20304-beta

Odpowiedz

20

Gdybyś nie wolą spać wątki, można to zrobić:

var tick = Observable.Interval(TimeSpan.FromSeconds(5)); 

eventAsObservable 
.Buffer(50) 
.Zip(tick, (res, _) => res) 
.Subscribe(DoProcessing); 
+1

Chciałbym móc to zrobić więcej niż raz! Spędziłem trzy godziny próbując to rozgryźć. –

1

Spróbuj tego:

var bufferedItems = eventAsObservable 
     .Buffer(50) 
     .Do(_ => { Thread.Sleep(10000); }); 
Powiązane problemy