2016-05-10 12 views
8

Próbuję zintegrować przepływ strumieniowy z akka w mojej aplikacji Play 2.5. Pomysł polega na tym, że można przesyłać strumieniowo na zdjęciu, a następnie zapisywać je na dysku jako surowy plik, miniaturową wersję i wersję ze znakiem wodnym.Jak zmontować zlew w strumieniach Akka z wielu zapisów?

udało mi się uzyskać tej pracy za pomocą wykresu coś takiego:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray}) 
            .map(_.result().toArray) 

def toByteArray = Flow[ByteString].map(b => b.toArray) 

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => 
    import GraphDSL.Implicits._ 
    val streamFan = builder.add(Broadcast[ByteString](3)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 
    val output = builder.add(Flow[ByteString].map(x => Success(Done))) 

    val rawFileSink = FileIO.toFile(file) 
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

    streamFan.out(0) ~> rawFileSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 
    streamFan.out(2) ~> output.in 

    byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink 
    byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink 

    FlowShape(streamFan.in, output.out) 
}) 

graph 

}

Potem drut go do mojego kontrolera odtwarzania przy użyciu akumulatora tak:

val sink = Sink.head[Try[Done]] 

val photoStorageParser = BodyParser { req => 
    Accumulator(sink).through(graph).map(Right.apply) 
} 

Problem polega na tym, że moje dwa przetwarzane zrzuty plików nie kończą się i otrzymuję zerowe rozmiary dla obu przetworzonych plików, ale nie dla surowych. Moja teoria mówi, że akumulator czeka tylko na jednym z wyjść mojego wentylatora, więc kiedy strumień wejściowy się zakończy i mój byteAccumulator wyda kompletny plik, do czasu zakończenia przetwarzania odtwarzanie ma zmaterializowaną wartość z wyjścia .

Moje pytania brzmią:
Czy jestem na dobrej drodze, jeśli chodzi o moje podejście? Jakie jest oczekiwane zachowanie podczas uruchamiania takiego wykresu? Jak mogę połączyć wszystkie moje umywalki, aby utworzyć jeden zlew?

+0

Uważam również, że powodem jest to, że przepływy nie są scalane po przetworzeniu. Czy wypróbowałeś 'Sink.combine' (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat

+0

Tak, dałem Sink.combine szansę, ale to jednoczy wiele zlewów, aby wysłać _do_ jak fan. Myślę, że szukam wachlarza, ale wygląda na to, że nie możesz tego zrobić, używając tylko źródeł pochłaniania! – Tompey

+0

Jest to podobny przykład: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Może powinieneś zwrócić 'SinkShape' zamiast' FlowShape', aby zadeklarować, że Twój strumień się zakończył? – devkat

Odpowiedz

7

Ok, po trochę pomocy (Andreas był na dobrej drodze), to znalazłeś się na to rozwiązanie, które załatwia sprawę:

val rawFileSink = FileIO.toFile(file) 
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) { 
    implicit builder => (rawSink, thumbSink, waterSink) => { 
    val streamFan = builder.add(Broadcast[ByteString](2)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 

    streamFan.out(0) ~> rawSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink 
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink 

    SinkShape(streamFan.in) 
    } 
}) 

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done))) 

Po czym to jest martwy każdy zadzwonić to od zaawansowania:

val photoStorageParser = BodyParser { req => 
    Accumulator(theSink).map(Right.apply) 
} 

def createImage(path: String) = Action(photoStorageParser) { req => 
    Created 
} 
+0

dzięki, po prostu miałem podobne zadanie i nie mogłem wymyślić, jak czekać na wszystkie zmaterializowane futures. Twoje rozwiązanie bardzo pomogło i działa! –

+0

Cześć! Co powiesz na zmienną liczbę zlewów dla kombinacji? – Alexander

Powiązane problemy