2016-02-25 16 views
10

Dlaczego jest wyjątek wDlaczego Akka Streams połyka moje wyjątki?

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 

object TestExceptionHandling { 
    def main(args: Array[String]): Unit = { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer()(defaultActorSystem) 

    Source(List(1, 2, 3)).map { i => 
     if (i == 2) { 
     throw new RuntimeException("Please, don't swallow me!") 
     } else { 
     i 
     } 
    }.runForeach { i => 
     println(s"Received $i") 
    } 
    } 
} 

zignorowane? Widzę, że strumień zostaje zatrzymany po wydrukowaniu Received 1, ale nic nie jest rejestrowane. Zauważ, że problemem nie jest ogólnie konfiguracja rejestrowania, ponieważ widzę dużo danych wyjściowych, jeśli ustawię akka.log-config-on-start = on w moim pliku application.conf.

+1

pan rzuca wyjątek z dala od zignorować wartość zwracaną 'runForeach'. –

+0

@ViktorKlang dzięki za wskazanie, właśnie zaktualizowałem swoją odpowiedź! –

Odpowiedz

11

Obecnie używam zwyczaj Supervision.Decider że pilnuje wyjątki są odpowiednio rejestrowane, które można skonfigurować tak:

val decider: Supervision.Decider = { e => 
    logger.error("Unhandled exception in stream", e) 
    Supervision.Stop 
} 

implicit val actorSystem = ActorSystem() 
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider) 
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem) 

Ponadto, jak zostało wskazane przez Vikor Klang, w przykładzie podanym powyżej wyjątek może również być „złapany” przez

Source(List(1, 2, 3)).map { i => 
    if (i == 2) { 
    throw new RuntimeException("Please, don't swallow me!") 
    } else { 
    i 
    } 
}.runForeach { i => 
    println(s"Received $i") 
}.onComplete { 
    case Success(_) => 
    println("Done") 
    case Failure(e) => 
    println(s"Failed with $e") 
} 

Uwaga jednak, że podejście to nie pomoże ci

Source(List(1, 2, 3)).map { i => 
    if (i == 2) { 
    throw new RuntimeException("Please, don't swallow me!") 
    } else { 
    i 
    } 
}.to(Sink.foreach { i => 
    println(s"Received $i") 
}).run() 

od run() zwraca Unit.

+1

'run()' zwraca tylko 'Unit', ponieważ domyślnie utrzymuje materialną wartość" lewej "strony (Keep.left). jeśli używałeś: ToMat (Sink.foreach (...)) (Keep.right), to znów by działało. –

4

Miałem podobne pytania, gdy zacząłem używać plików akk. Supervision.Decider pomaga, ale nie zawsze.

Niestety, nie przechwytuje wyjątków zgłoszonych w ActionPublisher. Widzę, że jest on obsługiwany, nazywa się go, ale nie osiąga Supervision.Decider. Działa z prostym strumieniem dostarczonym w dokumentacji.

Błędy również nie docierają do aktora, jeśli używam Sink.actorRef.

I dla dobra eksperymentu próbowałem Poniższy przykładowy

val stream = Source(0 to 5).map(100/_) 
stream.runWith(Sink.actorSubscriber(props)) 

W tym przypadku wyjątku został złapany przez decydentem, ale nigdy nie osiągnął aktor abonenta.

Ogólnie uważam, że to niespójne zachowanie. Nie mogę użyć jednego mechanizmu do obsługi błędów w strumieniu.

Mój oryginalny SO pytanie: Custom Supervision.Decider doesn't catch exception produced by ActorPublisher

I tu jest problem Akka gdzie jest śledzone: https://github.com/akka/akka/issues/18359

Powiązane problemy