UPDATE Z 2015-10-30realizacja Akka-Strumień wolniej niż pojedyncze wykonania gwintowany

podstawie Roland Kuhn Awnser:

Akka strumieniami pomocą asynchronicznego komunikatu przechodzącej pomiędzy aktorów, wdrażaj etapy przetwarzania strumienia. Przesyłanie danych przez asynchroniczną granicę ma tu narzut: obliczenia wydają się zajmować tylko około 160ns (uzyskanych z pomiaru jednowątkowego ), podczas gdy rozwiązanie do przesyłania strumieniowego pobiera około 1μs na element, który jest zdominowany przez wiadomość przechodzi.

Innym nieporozumieniem jest to, że powiedzenie „strumień” oznacza równoległość: w kodzie wszystkich obliczeń przebiega kolejno w jednym etapie aktor (mapa ), więc nie ma korzyści można oczekiwać nad prymitywnym jednowątkowy rozwiązania.

W celu skorzystania z równoległości zapewnianej przez Akka Strumienie cię trzeba mieć wiele etapów przetwarzania że każdy wykonują zadania

1μs na element, patrz również docs.

Wprowadziłem kilka zmian. Mój kod wygląda teraz tak:

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform) 

    val eventToFactorial = Flow[Event].map(SharedFunctions.transform2) 

    val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder => 
    import FlowGraph.Implicits._ 

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4)) 
    val mergeEvents = builder.add(Merge[Int](4)) 

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0) 
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1) 
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2) 
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3) 

    (dispatchTuple.in, mergeEvents.out) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 

    def main(args: Array[String]) { 
    println("MultiThread started: " + SharedFunctions.startTime) 
    // in.via(eventChef).runWith(sink) 


ja nie wiem, czy mam coś zupełnie źle, ale nadal moja realizacja z Akka-strumieni jest znacznie wolniejszy (teraz nawet wolniej jak poprzednio), ale co się dowiedziałem to: Jeśli zwiększenie pracy, na przykład poprzez podział, implementacja z akka-strumieniami jest szybsza. Więc jeśli dobrze to rozumiem (popraw mnie inaczej), wydaje się, że mój przykład zawiera zbyt wiele narzutów. Czy otrzymasz tylko korzyść ze strumieni akka, jeśli kod musi wykonywać ciężką pracę?

Jestem stosunkowo nowy w obu scala & Akka strumienia. Napisałem mały projekt testowy, który tworzy pewne zdarzenia, dopóki licznik nie osiągnie określonego numeru. Dla każdego zdarzenia obliczana jest silnia dla jednego pola zdarzenia. Zaimplementowałem to dwa razy. Jednorazowo z akka-stream i raz bez akka-stream (single threaded) i porównał środowisko wykonawcze.

Nie spodziewałem się, że: Kiedy utworzę pojedyncze zdarzenie, środowisko wykonawcze obu programów będzie prawie takie samo. Ale jeśli stworzę 70 000 000 zdarzeń, implementacja bez akka-strumieni będzie znacznie szybsza.Oto moi Wyniki (następujące dane na podstawie 24 pomiarów):

  • pojedyncze zdarzenie bez AKKA strumieni: 403 (+ - 2) MS
  • pojedyncze wydarzenie Akka strumienie: 444 (+ -13) MS

  • 70Mio zdarzenia bez AKKA strumieni: 11778 (+ -70) MS

  • 70Mio zdarzeń ze akka-parowych: 75424 - 2959 (+) MS

Moje pytanie brzmi: co się dzieje? Dlaczego moja implementacja z akka-stream jest wolniejsza?

tutaj mój kod:

Wykonanie z Akka

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 

    def main(args: Array[String]) { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink).onComplete(_ => endAkka()) 


Wdrożenie bez Akka

obiekt SingleThread {

def main(args: Array[String]) { 
    println("SingleThread started at: " + SharedFunctions.startTime) 
    val i = createEvent(0) 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now()); 
    println("Time: " + duration.getMillis + " || Data: " + i) 

    def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = { 
    if (count == SharedFunctions.maxEventCount) count 
    else { 
     val e = SharedFunctions.transform((randDate, name, age, myFloat)) 
     val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count, 
     DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
     createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f) 

    def createEvent(count: Int): Int = { 
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f) 


object SharedFunctions { 
    val maxEventCount = 70000000 
    val startTime = DateTime.now 

    def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4) 
    def transform2(e : Event) : Int = factorial(e.getAgeYrs) 

    def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue) 
    def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = { 
    val cProgress = calculatePercentage(fileSize, currentSize) 
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms") 

    private def factorialWorker(n1: Int, n2: Int): Int = { 
    if (n1 == 0) n2 
    else factorialWorker(n1 -1, n2*n1) 
    def factorial (n : Int): Int = { 
    factorialWorker(n, 1) 

Oprócz wyjaśnień Rolanda, z którymi się w pełni zgadzam, należy rozumieć, że strumienie akka to nie tylko współbieżne ramy programistyczne.Strumienie zapewniają również przeciwciśnienie, co oznacza, że ​​zdarzenia są generowane tylko przez Source, gdy istnieje zapotrzebowanie na ich przetworzenie w Sink. Ta komunikacja z żądaniem dodaje pewne koszty na każdym etapie przetwarzania.

W związku z tym porównanie pojedynczych nici i wielu nitek nie jest "jabłkiem na jabłka".

Jeśli chcesz uzyskać wydajność wielowątkowego przetwarzania surowego, to Futures/Actors są lepszym rozwiązaniem.


Prawda. Dobre zastosowania w przypadku strumieni obejmują strumieniowanie w czasie rzeczywistym lub strumieniowanie dużych ilości danych. Analizowanie ogromnych plików wideo bez odczytu wszystkiego do pamięci, pozyskiwanie milionów rekordów za pośrednictwem połączenia internetowego lub używanie obserwatorów plików w celu oczekiwania na odrzucenie pliku przed analizą składową to dobre przykłady. Ograniczają one złożoność środowisk, w których maksymalna szybkość nie jest konieczna i mogą być nierozsądne (dane przychodzą przez dłuższy czas lub wymagają milionów połączeń sieciowych) lub pomagają zmniejszyć złożoność w przypadku ogromnej ilości danych. –


, nie wiem, dlaczego nie była to akceptowana odpowiedź - poprzedza zaakceptowaną odpowiedź, która faktycznie zgadza się z nią i ma tylko drugorzędny punkt wtórny do tego odpowiedź. – doug

