2015-10-29 9 views
9

UPDATE Z 2015-10-30realizacja Akka-Strumień wolniej niż pojedyncze wykonania gwintowany


podstawie Roland Kuhn Awnser:

Akka strumieniami pomocą asynchronicznego komunikatu przechodzącej pomiędzy aktorów, wdrażaj etapy przetwarzania strumienia. Przesyłanie danych przez asynchroniczną granicę ma tu narzut: obliczenia wydają się zajmować tylko około 160ns (uzyskanych z pomiaru jednowątkowego ), podczas gdy rozwiązanie do przesyłania strumieniowego pobiera około 1μs na element, który jest zdominowany przez wiadomość przechodzi.

Innym nieporozumieniem jest to, że powiedzenie „strumień” oznacza równoległość: w kodzie wszystkich obliczeń przebiega kolejno w jednym etapie aktor (mapa ), więc nie ma korzyści można oczekiwać nad prymitywnym jednowątkowy rozwiązania.

W celu skorzystania z równoległości zapewnianej przez Akka Strumienie cię trzeba mieć wiele etapów przetwarzania że każdy wykonują zadania

1μs na element, patrz również docs.

Wprowadziłem kilka zmian. Mój kod wygląda teraz tak:

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform) 

    val eventToFactorial = Flow[Event].map(SharedFunctions.transform2) 

    val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder => 
    import FlowGraph.Implicits._ 

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4)) 
    val mergeEvents = builder.add(Merge[Int](4)) 

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0) 
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1) 
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2) 
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3) 

    (dispatchTuple.in, mergeEvents.out) 
    } 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 
    } 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 
    actorSystem.shutdown 
    actorSystem.awaitTermination 
    System.exit(-1) 
    } 

    def main(args: Array[String]) { 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink) 
    // in.via(eventChef).runWith(sink) 
    } 

} 

ja nie wiem, czy mam coś zupełnie źle, ale nadal moja realizacja z Akka-strumieni jest znacznie wolniejszy (teraz nawet wolniej jak poprzednio), ale co się dowiedziałem to: Jeśli zwiększenie pracy, na przykład poprzez podział, implementacja z akka-strumieniami jest szybsza. Więc jeśli dobrze to rozumiem (popraw mnie inaczej), wydaje się, że mój przykład zawiera zbyt wiele narzutów. Czy otrzymasz tylko korzyść ze strumieni akka, jeśli kod musi wykonywać ciężką pracę?




Jestem stosunkowo nowy w obu scala & Akka strumienia. Napisałem mały projekt testowy, który tworzy pewne zdarzenia, dopóki licznik nie osiągnie określonego numeru. Dla każdego zdarzenia obliczana jest silnia dla jednego pola zdarzenia. Zaimplementowałem to dwa razy. Jednorazowo z akka-stream i raz bez akka-stream (single threaded) i porównał środowisko wykonawcze.

Nie spodziewałem się, że: Kiedy utworzę pojedyncze zdarzenie, środowisko wykonawcze obu programów będzie prawie takie samo. Ale jeśli stworzę 70 000 000 zdarzeń, implementacja bez akka-strumieni będzie znacznie szybsza.Oto moi Wyniki (następujące dane na podstawie 24 pomiarów):


  • pojedyncze zdarzenie bez AKKA strumieni: 403 (+ - 2) MS
  • pojedyncze wydarzenie Akka strumienie: 444 (+ -13) MS


  • 70Mio zdarzenia bez AKKA strumieni: 11778 (+ -70) MS

  • 70Mio zdarzeń ze akka-parowych: 75424 - 2959 (+) MS

Moje pytanie brzmi: co się dzieje? Dlaczego moja implementacja z akka-stream jest wolniejsza?

tutaj mój kod:

Wykonanie z Akka

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 
    } 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 
    actorSystem.shutdown 
    actorSystem.awaitTermination 
    System.exit(-1) 
    } 

    def main(args: Array[String]) { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink).onComplete(_ => endAkka()) 
    } 

} 

Wdrożenie bez Akka

obiekt SingleThread {

def main(args: Array[String]) { 
    println("SingleThread started at: " + SharedFunctions.startTime) 
    println("0%") 
    val i = createEvent(0) 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now()); 
    println("Time: " + duration.getMillis + " || Data: " + i) 
    } 

    def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = { 
    if (count == SharedFunctions.maxEventCount) count 
    else { 
     val e = SharedFunctions.transform((randDate, name, age, myFloat)) 
     SharedFunctions.transform2(e) 
     val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count, 
     DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
     createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f) 
    } 
    } 

    def createEvent(count: Int): Int = { 
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f) 
    } 
} 

SharedFunctions

object SharedFunctions { 
    val maxEventCount = 70000000 
    val startTime = DateTime.now 

    def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4) 
    def transform2(e : Event) : Int = factorial(e.getAgeYrs) 

    def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue) 
    def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = { 
    val cProgress = calculatePercentage(fileSize, currentSize) 
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms") 
    cProgress 
    } 

    private def factorialWorker(n1: Int, n2: Int): Int = { 
    if (n1 == 0) n2 
    else factorialWorker(n1 -1, n2*n1) 
    } 
    def factorial (n : Int): Int = { 
    factorialWorker(n, 1) 
    } 
} 

Realizacja Event

/** 
* Autogenerated by Avro 
* 
* DO NOT EDIT DIRECTLY 
*/ 

@SuppressWarnings("all") 
@org.apache.avro.specific.AvroGenerated 
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { 
    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}"); 
    public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } 
    @Deprecated public long timestampMS; 
    @Deprecated public CharSequence name; 
    @Deprecated public int ageYrs; 
    @Deprecated public float sizeCm; 

    /** 
    * Default constructor. Note that this does not initialize fields 
    * to their default values from the schema. If that is desired then 
    * one should use <code>newBuilder()</code>. 
    */ 
    public Event() {} 

    /** 
    * All-args constructor. 
    */ 
    public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) { 
    this.timestampMS = timestampMS; 
    this.name = name; 
    this.ageYrs = ageYrs; 
    this.sizeCm = sizeCm; 
    } 

    public org.apache.avro.Schema getSchema() { return SCHEMA$; } 
    // Used by DatumWriter. Applications should not call. 
    public Object get(int field$) { 
    switch (field$) { 
    case 0: return timestampMS; 
    case 1: return name; 
    case 2: return ageYrs; 
    case 3: return sizeCm; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    } 
    } 
    // Used by DatumReader. Applications should not call. 
    @SuppressWarnings(value="unchecked") 
    public void put(int field$, Object value$) { 
    switch (field$) { 
    case 0: timestampMS = (Long)value$; break; 
    case 1: name = (CharSequence)value$; break; 
    case 2: ageYrs = (Integer)value$; break; 
    case 3: sizeCm = (Float)value$; break; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    } 
    } 

    /** 
    * Gets the value of the 'timestampMS' field. 
    */ 
    public Long getTimestampMS() { 
    return timestampMS; 
    } 

    /** 
    * Sets the value of the 'timestampMS' field. 
    * @param value the value to set. 
    */ 
    public void setTimestampMS(Long value) { 
    this.timestampMS = value; 
    } 

    /** 
    * Gets the value of the 'name' field. 
    */ 
    public CharSequence getName() { 
    return name; 
    } 

    /** 
    * Sets the value of the 'name' field. 
    * @param value the value to set. 
    */ 
    public void setName(CharSequence value) { 
    this.name = value; 
    } 

    /** 
    * Gets the value of the 'ageYrs' field. 
    */ 
    public Integer getAgeYrs() { 
    return ageYrs; 
    } 

    /** 
    * Sets the value of the 'ageYrs' field. 
    * @param value the value to set. 
    */ 
    public void setAgeYrs(Integer value) { 
    this.ageYrs = value; 
    } 

    /** 
    * Gets the value of the 'sizeCm' field. 
    */ 
    public Float getSizeCm() { 
    return sizeCm; 
    } 

    /** 
    * Sets the value of the 'sizeCm' field. 
    * @param value the value to set. 
    */ 
    public void setSizeCm(Float value) { 
    this.sizeCm = value; 
    } 

    /** Creates a new Event RecordBuilder */ 
    public static Event.Builder newBuilder() { 
    return new Event.Builder(); 
    } 

    /** Creates a new Event RecordBuilder by copying an existing Builder */ 
    public static Event.Builder newBuilder(Event.Builder other) { 
    return new Event.Builder(other); 
    } 

    /** Creates a new Event RecordBuilder by copying an existing Event instance */ 
    public static Event.Builder newBuilder(Event other) { 
    return new Event.Builder(other); 
    } 

    /** 
    * RecordBuilder for Event instances. 
    */ 
    public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event> 
    implements org.apache.avro.data.RecordBuilder<Event> { 

    private long timestampMS; 
    private CharSequence name; 
    private int ageYrs; 
    private float sizeCm; 

    /** Creates a new Builder */ 
    private Builder() { 
     super(Event.SCHEMA$); 
    } 

    /** Creates a Builder by copying an existing Builder */ 
    private Builder(Event.Builder other) { 
     super(other); 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     } 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     } 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     } 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 
     } 
    } 

    /** Creates a Builder by copying an existing Event instance */ 
    private Builder(Event other) { 
      super(Event.SCHEMA$); 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     } 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     } 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     } 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 
     } 
    } 

    /** Gets the value of the 'timestampMS' field */ 
    public Long getTimestampMS() { 
     return timestampMS; 
    } 

    /** Sets the value of the 'timestampMS' field */ 
    public Event.Builder setTimestampMS(long value) { 
     validate(fields()[0], value); 
     this.timestampMS = value; 
     fieldSetFlags()[0] = true; 
     return this; 
    } 

    /** Checks whether the 'timestampMS' field has been set */ 
    public boolean hasTimestampMS() { 
     return fieldSetFlags()[0]; 
    } 

    /** Clears the value of the 'timestampMS' field */ 
    public Event.Builder clearTimestampMS() { 
     fieldSetFlags()[0] = false; 
     return this; 
    } 

    /** Gets the value of the 'name' field */ 
    public CharSequence getName() { 
     return name; 
    } 

    /** Sets the value of the 'name' field */ 
    public Event.Builder setName(CharSequence value) { 
     validate(fields()[1], value); 
     this.name = value; 
     fieldSetFlags()[1] = true; 
     return this; 
    } 

    /** Checks whether the 'name' field has been set */ 
    public boolean hasName() { 
     return fieldSetFlags()[1]; 
    } 

    /** Clears the value of the 'name' field */ 
    public Event.Builder clearName() { 
     name = null; 
     fieldSetFlags()[1] = false; 
     return this; 
    } 

    /** Gets the value of the 'ageYrs' field */ 
    public Integer getAgeYrs() { 
     return ageYrs; 
    } 

    /** Sets the value of the 'ageYrs' field */ 
    public Event.Builder setAgeYrs(int value) { 
     validate(fields()[2], value); 
     this.ageYrs = value; 
     fieldSetFlags()[2] = true; 
     return this; 
    } 

    /** Checks whether the 'ageYrs' field has been set */ 
    public boolean hasAgeYrs() { 
     return fieldSetFlags()[2]; 
    } 

    /** Clears the value of the 'ageYrs' field */ 
    public Event.Builder clearAgeYrs() { 
     fieldSetFlags()[2] = false; 
     return this; 
    } 

    /** Gets the value of the 'sizeCm' field */ 
    public Float getSizeCm() { 
     return sizeCm; 
    } 

    /** Sets the value of the 'sizeCm' field */ 
    public Event.Builder setSizeCm(float value) { 
     validate(fields()[3], value); 
     this.sizeCm = value; 
     fieldSetFlags()[3] = true; 
     return this; 
    } 

    /** Checks whether the 'sizeCm' field has been set */ 
    public boolean hasSizeCm() { 
     return fieldSetFlags()[3]; 
    } 

    /** Clears the value of the 'sizeCm' field */ 
    public Event.Builder clearSizeCm() { 
     fieldSetFlags()[3] = false; 
     return this; 
    } 

    @Override 
    public Event build() { 
     try { 
     Event record = new Event(); 
     record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]); 
     record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]); 
     record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]); 
     record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]); 
     return record; 
     } catch (Exception e) { 
     throw new org.apache.avro.AvroRuntimeException(e); 
     } 
    } 
    } 
} 
+0

Tylko w celu uzupełnienia, proszę podać definicję zdarzenia. Chcę spróbować zoptymalizować twój kod wielowątkowy ... –

+0

, czy dodałem go, –

Odpowiedz

11

Oprócz wyjaśnień Rolanda, z którymi się w pełni zgadzam, należy rozumieć, że strumienie akka to nie tylko współbieżne ramy programistyczne.Strumienie zapewniają również przeciwciśnienie, co oznacza, że ​​zdarzenia są generowane tylko przez Source, gdy istnieje zapotrzebowanie na ich przetworzenie w Sink. Ta komunikacja z żądaniem dodaje pewne koszty na każdym etapie przetwarzania.

W związku z tym porównanie pojedynczych nici i wielu nitek nie jest "jabłkiem na jabłka".

Jeśli chcesz uzyskać wydajność wielowątkowego przetwarzania surowego, to Futures/Actors są lepszym rozwiązaniem.

+0

Prawda. Dobre zastosowania w przypadku strumieni obejmują strumieniowanie w czasie rzeczywistym lub strumieniowanie dużych ilości danych. Analizowanie ogromnych plików wideo bez odczytu wszystkiego do pamięci, pozyskiwanie milionów rekordów za pośrednictwem połączenia internetowego lub używanie obserwatorów plików w celu oczekiwania na odrzucenie pliku przed analizą składową to dobre przykłady. Ograniczają one złożoność środowisk, w których maksymalna szybkość nie jest konieczna i mogą być nierozsądne (dane przychodzą przez dłuższy czas lub wymagają milionów połączeń sieciowych) lub pomagają zmniejszyć złożoność w przypadku ogromnej ilości danych. –

27

Akka Strumienie wykorzystuje asynchroniczne przekazywanie komunikatów pomiędzy podmiotami w celu realizacji strumieniowych etapy przetwarzania. Przesyłanie danych przez asynchroniczną granicę ma narzut, który tutaj widzisz: obliczenia wydają się zajmować tylko około 160ns (uzyskanych z pomiaru jednowątkowego), podczas gdy rozwiązanie do przesyłania strumieniowego zajmuje około 1μs na element, który jest zdominowany przez przekazywanie wiadomości.

Innym błędnym przekonaniem jest to, że powiedzenie "strumień" oznacza paralelizm: w twoim kodzie wszystkie obliczenia są wykonywane sekwencyjnie w jednym Actor (stadium map), więc nie można oczekiwać korzyści w stosunku do prymitywnego rozwiązania jednowątkowego.

Aby skorzystać z równoległości zapewnianej przez strumienie Akka, trzeba mieć wiele etapów przetwarzania, z których każdy wykonuje zadania o wartości> 1μs na element, patrz także: the docs.

+5

, nie wiem, dlaczego nie była to akceptowana odpowiedź - poprzedza zaakceptowaną odpowiedź, która faktycznie zgadza się z nią i ma tylko drugorzędny punkt wtórny do tego odpowiedź. – doug

Powiązane problemy