2016-02-17 6 views
14

Używam strumieni akka graphDSL do utworzenia działającego wykresu. Nie ma błędów kompilacji podczas wlotu/wylotu komponentów strumienia. Środowisko wykonawcze zgłasza następujący błąd:Błąd w tworzeniu wykresu: wymaganie nie powiodło się: Wejścia [] i wyjścia [] muszą odpowiadać wlotom [w] i wylotom [out]

Jakieś pomysły, które należy zweryfikować, aby uruchomić?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out] 
at scala.Predef$.require(Predef.scala:219) 
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168) 
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390) 
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18) 
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813) 
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109) 
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65) 
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39) 
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13) 

Struktura wykres:

source ~> flowRate ~> render ~> platformPartition.in 
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0) 
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1) 
merger.out ~> evtCreator ~> Sink.ignore 
+0

Czy możesz zamieścić swój wykres? – manub

+1

Zaktualizowałem pytanie o strukturę wykresu. Wszystkie parametry źródła/przepływu/zagłębienia pasują do wejścia/wyjścia. – phantomastray

+0

Typy 'render',' platformPartition', 'merger' i' evtCreator' również mogą być przydatne. – manub

Odpowiedz

14

Masz nieużywany wlot lub wylot (jeden z przepływów czy coś nie jest połączony ze wszystkich stron). Oto kilka przykładów:

to działa:

val workingFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val intFlow = b.add(Flow[Int]) 
    FlowShape(intFlow.in, intFlow.out) 
    }) 

Poniższy kod generuje błąd podobny do Twojego, bo ma całą niewykorzystaną przepływ:

val buggyFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val intFlow = b.add(Flow[Int]) 
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow is unused 
    FlowShape(intFlow.in, intFlow.out) 
    }) 

nieco bardziej skomplikowany przykład: Tutaj nie ma całego nieużywanego przepływu, jest tylko nieużywany wylot. Występuje ten sam błąd:

val buggyFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[Int](2)) 
    val intFlow = b.add(Flow[Int]) 
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow's outlet isn't used 

    broadcast ~> intFlow 
    broadcast ~> unusedFlow 

    FlowShape(broadcast.in, intFlow.out) 
    }) 
+0

tak, pomyśleliśmy, że już dawno temu. Komunikat o błędzie jest nieintuicyjny. Powinienem zaktualizować to pytanie. :) Dzięki, choć za patrzenie w to. – phantomastray

Powiązane problemy