2016-03-25 10 views
5

Mam strumień Akka i chcę, aby strumień wysyłał wiadomości w dół co około sekundę.Jak ograniczyć strumień Akka do wykonania i wysłać jedną wiadomość tylko raz na sekundę?

Próbowałem rozwiązać ten problem na dwa sposoby. Pierwszym sposobem było sprawienie, aby producent na początku strumienia wysyłał wiadomości tylko raz na sekundę, gdy do tego aktora przychodzi wiadomość Kontynuuj.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

Działa to na krótką chwilę potem powódź dalej komunikaty pojawiają się w aktora ActorPublisher, zakładam (przypuszczenie, ale nie jestem pewien) z prądem poprzez ciśnienie wsteczne zainteresowanie wiadomości jako dalszy może zużywać szybko, ale Upstream nie produkuje w szybkim tempie. Tak więc ta metoda nie powiodła się.

Innym sposobem, w jaki próbowałem, było sterowanie ciśnieniem wstecznym, użyłem MaxInFlightRequestStrategy na końcu ActorSubscriber, aby ograniczyć liczbę wiadomości do 1 na sekundę. Działa to, ale wiadomości przychodzą w przybliżeniu po około trzech naraz, nie tylko po jednym na raz. Wygląda na to, że kontrola ciśnienia wstecznego nie zmienia natychmiast liczby wiadomości przychodzących w wiadomościach OR, które były już w kolejce w strumieniu i czekają na przetworzenie.

Problem polega na tym, że mogę mieć strumień Akka, który może przetworzyć tylko jedną wiadomość na sekundę?


odkryłem, że MaxInFlightRequestStrategy jest prawidłowy sposób to zrobić, ale należy ustawić wielkości wsadu do 1, jego wielkość partii jest domyślnym 5, który był przyczyną problemu znalazłem. Jest to również zbyt skomplikowany sposób na rozwiązanie problemu, teraz gdy patrzę na przesłaną tutaj odpowiedź.

+2

Czy za pomocą 'Source.tick'? – cmbaxter

+0

Nie, pozwól mi spojrzeć na to dzięki. – Phil

+0

możesz również spróbować 'przepustnicy'. –

Odpowiedz

10

Można albo poddać elementy procesowi dławiącemu, który szybko wywróci ciśnienie, albo użyć kombinacji tick i zip.

Rozwiązanie pięść byłoby tak:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val throttlingFlow = Flow[Long].throttle(
    // how many elements do you allow 
    elements = 1, 
    // in what unit of time 
    per = 1.second, 
    maximumBurst = 0, 
    // you can also set this to Enforcing, but then your 
    // stream will collapse if exceeding the number of elements/s 
    mode = ThrottleMode.Shaping 
) 

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println)) 

Drugim rozwiązaniem byłoby tak:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println)) 
Powiązane problemy