2016-07-18 9 views
6

Mam strumienie Akka Source, które chcę podzielić na dwa źródła zgodnie z predykatem.Podział źródła strumienia Akka na dwa

E.g. że źródło (typy uproszczony celowo)

val source: Source[Either[Throwable, String], NotUsed] = ??? 

I dwiema metodami:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ??? 
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ??? 

ja jak aby móc rozdzielać source stosownie do _.isRight orzeczenie i przechodzą w prawej części na handleSuccess metody i pozostawiono część do metody handleFailure.

Próbowałem używać splittera Broadcast, ale na końcu wymaga to Sink.

+0

Nie sądzę, że jest możliwe, aby podzielić się na dwa źródła źródeł w taki sposób, ponieważ te źródła mogą być dzielone następnie zmaterializował osobno i nie jest to absolutnie nie wiadomo, jak powinno wtedy działać. –

+0

Tak, rozumiem konsekwencje tego. Interesuje mnie alternatywny wzorzec do re-strukturyzacji kodu.Na przykład, mógłbym sprawić, by moje metody zwracały 'Sink's zamiast akceptowania' Source'. – Tvaroh

Odpowiedz

5

Chociaż można wybrać, po której stronie Source chcesz odzyskać pozycji z to nie jest możliwe stworzenie Source że daje dwa wyjścia, które jest to, co wydaje się byś ostatecznie chcą.

Biorąc pod uwagę GraphStage poniżej której zasadniczo dzieli lewo i prawo w wartości dwóch wyjść ...

/** 
    * Fans out left and right values of an either 
    * @tparam L left value type 
    * @tparam R right value type 
    */ 
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] { 
    import akka.stream.{Attributes, Outlet} 
    import akka.stream.stage.GraphStageLogic 

    override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut") 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 

    var out0demand = false 
    var out1demand = false 

    setHandler(shape.in, new InHandler { 
     override def onPush(): Unit = { 

     if (out0demand && out1demand) { 
      grab(shape.in) match { 
      case Left(l) => 
       out0demand = false 
       push(shape.out0, l) 
      case Right(r) => 
       out1demand = false 
       push(shape.out1, r) 
      } 
     } 
     } 
    }) 

    setHandler(shape.out0, new OutHandler { 
     @scala.throws[Exception](classOf[Exception]) 
     override def onPull(): Unit = { 
     if (!out0demand) { 
      out0demand = true 
     } 

     if (out0demand && out1demand) { 
      pull(shape.in) 
     } 
     } 
    }) 

    setHandler(shape.out1, new OutHandler { 
     @scala.throws[Exception](classOf[Exception]) 
     override def onPull(): Unit = { 
     if (!out1demand) { 
      out1demand = true 
     } 

     if (out0demand && out1demand) { 
      pull(shape.in) 
     } 
     } 
    }) 
    } 
} 

.. można przekierować je otrzymać tylko jedną stronę:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s => 
    import GraphDSL.Implicits._ 

    val eitherFanOut = b.add(new EitherFanOut[Throwable, String]) 

    s ~> eitherFanOut.in 
    eitherFanOut.out0 ~> Sink.ignore 

    SourceShape(eitherFanOut.out1) 
}) 

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf) 

... lub prawdopodobnie bardziej pożądane, przekieruj je do dwóch oddzielnych Sink s:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) 
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) 

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) => 

    import GraphDSL.Implicits._ 

    val eitherFanOut = b.add(new EitherFanOut[Throwable, String]) 

    s ~> eitherFanOut.in 
    eitherFanOut.out0 ~> l.in 
    eitherFanOut.out1 ~> r.in 

    ClosedShape 
}) 


val r = flow.run() 
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf) 

(import i wstępna konfiguracja)

import akka.NotUsed 
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source} 
import akka.stream.stage.{GraphStage, InHandler, OutHandler} 
import akka.stream._ 
import akka.actor.ActorSystem 
import com.typesafe.config.ConfigFactory 

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 

import scala.concurrent.Await 
import scala.concurrent.duration.Duration 

val classLoader = getClass.getClassLoader 
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader) 
implicit val materializer = ActorMaterializer() 

val values: List[Either[Throwable, String]] = List(
    Right("B"), 
    Left(new Throwable), 
    Left(new RuntimeException), 
    Right("B"), 
    Right("C"), 
    Right("G"), 
    Right("I"), 
    Right("F"), 
    Right("T"), 
    Right("A") 
) 

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator) 
2

Z tego można użyć broadcast, następnie przesączyć i mapa strumieni w GraphDSL:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) 
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) 


val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) => 

     import GraphDSL.Implicits._ 

     val broadcast = b.add(Broadcast[Either[Throwable,String]](2)) 


     s ~> broadcast.in 
     broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in 
     broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in 


     ClosedShape 
    }) 


val r = flow.run() 
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf) 

I oczekiwać będzie w stanie uruchomić funkcje, które chcesz z poziomu mapy.

+3

Bit bezpieczniejszym wariantem '.filter (_. IsLeft) .map (_. Left.get)' jest '.collect {case Left (l) => l}' I tak dla gałęzi _right_ – Alexander

4

Zaimplementowano to w akka-stream-contrib jako PartitionWith. Dodaj tę zależność do SBT pociągnąć go do projektu:

// latest version available on https://github.com/akka/akka-stream-contrib/releases libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.8"

PartitionWith ma kształt Broadcast(2), ale potencjalnie różnych typów dla każdego z dwóch gniazd. Podajesz mu predykat, który można zastosować do każdego elementu, i w zależności od wyniku zostają przekierowani do odpowiedniego gniazdka. Następnie możesz odpowiednio podłączyć Sink lub Flow do każdego z tych gniazd. Opierając się na cessationoftime's example, z Broadcast zastąpione z PartitionWith:

val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty 
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) 
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) 

val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink) 
            ((_, _, _)) { implicit b => (s, l, r) => 

    import GraphDSL.Implicits._ 

    val pw = b.add(
    PartitionWith.apply[Either[Throwable, String], Throwable, String](identity) 
) 

    eitherSource ~> pw.in 
    pw.out0 ~> leftSink 
    pw.out1 ~> rightSink 

    ClosedShape 
}) 

val r = flow.run() 
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)