Próbuję zintegrować przepływ strumieniowy z akka w mojej aplikacji Play 2.5. Pomysł polega na tym, że można przesyłać strumieniowo na zdjęciu, a następnie zapisywać je na dysku jako surowy plik, miniaturową wersję i wersję ze znakiem wodnym.Jak zmontować zlew w strumieniach Akka z wielu zapisów?
udało mi się uzyskać tej pracy za pomocą wykresu coś takiego:
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
Potem drut go do mojego kontrolera odtwarzania przy użyciu akumulatora tak:
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
Problem polega na tym, że moje dwa przetwarzane zrzuty plików nie kończą się i otrzymuję zerowe rozmiary dla obu przetworzonych plików, ale nie dla surowych. Moja teoria mówi, że akumulator czeka tylko na jednym z wyjść mojego wentylatora, więc kiedy strumień wejściowy się zakończy i mój byteAccumulator wyda kompletny plik, do czasu zakończenia przetwarzania odtwarzanie ma zmaterializowaną wartość z wyjścia .
Moje pytania brzmią:
Czy jestem na dobrej drodze, jeśli chodzi o moje podejście? Jakie jest oczekiwane zachowanie podczas uruchamiania takiego wykresu? Jak mogę połączyć wszystkie moje umywalki, aby utworzyć jeden zlew?
Uważam również, że powodem jest to, że przepływy nie są scalane po przetworzeniu. Czy wypróbowałeś 'Sink.combine' (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat
Tak, dałem Sink.combine szansę, ale to jednoczy wiele zlewów, aby wysłać _do_ jak fan. Myślę, że szukam wachlarza, ale wygląda na to, że nie możesz tego zrobić, używając tylko źródeł pochłaniania! – Tompey
Jest to podobny przykład: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Może powinieneś zwrócić 'SinkShape' zamiast' FlowShape', aby zadeklarować, że Twój strumień się zakończył? – devkat