Mam aplikację, która w niektórych punktach podnosi prawie 1000 zdarzeń w tym samym czasie. To, co chciałbym zrobić, to wsadowe zdarzenia do partii 50 elementów i rozpoczynanie ich przetwarzania co 10 sekund. Nie trzeba czekać na zakończenie zadania wsadowego przed rozpoczęciem nowego przetwarzania wsadowego.Rozszerzenia reaktywne: Przetwarzaj zdarzenia w partiach + dodaj opóźnienie między każdą serią
Na przykład:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
Jakieś pomysły jak to osiągnąć? Przypuszczam, że Reactive Extensions to droga, ale inne rozwiązania też są akceptowalne.
starałem się rozpocząć tutaj:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
Ale zauważyłem, że opóźnienie nie działa jak się spodziewałem, a zamiast tego wszystkie partie rozpoczęła jednocześnie, chociaż 5 sekund z opóźnieniem.
Testowałem również metodę Window, ale nie zauważyłem żadnej różnicy w zachowaniu. Przypuszczam TimeSpan w oknie w rzeczywistości oznacza, że „podejmie wszelkie zdarzenia, które dzieje się w ciągu następnych 10 sekund.
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
Używam Rx-Main 2.0.20304-beta
Chciałbym móc to zrobić więcej niż raz! Spędziłem trzy godziny próbując to rozgryźć. –