2016-06-18 12 views
12

Chcę czytać wiele dużych plików, używając strumieni akka do przetwarzania każdej linii. Wyobraź sobie, że każdy klucz składa się z ("identyfikator" -> "wartość"). Jeśli zostanie znaleziony nowy "identyfikator", chcę zapisać go w bazie danych i zapisać jego "wartość", w przeciwnym razie, jeśli identyfikator został już znaleziony podczas przetwarzania strumienia linii, chcę zapisać tylko "wartość". W tym celu uważam, że potrzebuję pewnego rodzaju rekurencyjnego, stanowego przepływu, aby zachować identyfikatory, które zostały już znalezione na mapie. Myślę, że w tym przepływie otrzymam parę (newLine, contextWithIdentifiers).Strumienie Akka. Stan stanowy w przepływie

Właśnie zacząłem oglądać strumienie akka. Sądzę, że mogę sobie poradzić z obsługą bezpaństwowego przetwarzania, ale nie mam pojęcia, jak zachować "contextWithIdentifiers". Byłbym wdzięczny, gdyby ktoś nie wskazał mi właściwego kierunku.

Używam Scala.

+2

Doceniam, że o to pytasz. To taka prosta prośba, jednak znalezienie sensownej odpowiedzi przy pomocy przykładowego kodu wydaje się skomplikowane. To jedyny, który znalazłem! – akauppi

Odpowiedz

17

Hej, może coś takiego jak statefulMapConcat może ci w tym pomóc.

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import scala.util.Random._ 
import scala.math.abs 
import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

//encapsulating your input 
case class IdentValue(id: Int, value: String) 
//some random generated input 
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere")) 

val stateFlow = Flow[IdentValue].statefulMapConcat{() => 
    //state with already processed ids 
    var ids = Set.empty[Int] 
    identValue => if (ids.contains(identValue.id)) { 
    //save value to DB 
    println(identValue.value) 
    List(identValue) 
    } else { 
    //save both to database 
    println(identValue) 
    ids = ids + identValue.id 
    List(identValue) 
    } 
} 

Source(identValues) 
    .via(stateFlow) 
    .runWith(Sink.seq) 
    .onSuccess { case identValue => println(identValue) } 
+0

Dzięki za kod. Doceniłbym trochę więcej typów w środku, ponieważ jest zaangażowana fabryka() => .... Czy wiesz, dlaczego nie ma metody ".statefulMap"? – akauppi