Mam strumień Akka i chcę, aby strumień wysyłał wiadomości w dół co około sekundę.Jak ograniczyć strumień Akka do wykonania i wysłać jedną wiadomość tylko raz na sekundę?
Próbowałem rozwiązać ten problem na dwa sposoby. Pierwszym sposobem było sprawienie, aby producent na początku strumienia wysyłał wiadomości tylko raz na sekundę, gdy do tego aktora przychodzi wiadomość Kontynuuj.
// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }
Działa to na krótką chwilę potem powódź dalej komunikaty pojawiają się w aktora ActorPublisher, zakładam (przypuszczenie, ale nie jestem pewien) z prądem poprzez ciśnienie wsteczne zainteresowanie wiadomości jako dalszy może zużywać szybko, ale Upstream nie produkuje w szybkim tempie. Tak więc ta metoda nie powiodła się.
Innym sposobem, w jaki próbowałem, było sterowanie ciśnieniem wstecznym, użyłem MaxInFlightRequestStrategy
na końcu ActorSubscriber
, aby ograniczyć liczbę wiadomości do 1 na sekundę. Działa to, ale wiadomości przychodzą w przybliżeniu po około trzech naraz, nie tylko po jednym na raz. Wygląda na to, że kontrola ciśnienia wstecznego nie zmienia natychmiast liczby wiadomości przychodzących w wiadomościach OR, które były już w kolejce w strumieniu i czekają na przetworzenie.
Problem polega na tym, że mogę mieć strumień Akka, który może przetworzyć tylko jedną wiadomość na sekundę?
odkryłem, że MaxInFlightRequestStrategy
jest prawidłowy sposób to zrobić, ale należy ustawić wielkości wsadu do 1, jego wielkość partii jest domyślnym 5, który był przyczyną problemu znalazłem. Jest to również zbyt skomplikowany sposób na rozwiązanie problemu, teraz gdy patrzę na przesłaną tutaj odpowiedź.
Czy za pomocą 'Source.tick'? – cmbaxter
Nie, pozwól mi spojrzeć na to dzięki. – Phil
możesz również spróbować 'przepustnicy'. –