2011-08-16 11 views
8

Próbuję uzyskać zachowanie odporne na błędy w akka Aktorzy. Pracuję nad kodem, który zależy od aktorów w systemie dostępnych dla długiego czasu przetwarzania. Zauważyłem, że moje przetwarzanie zatrzymuje się po kilku godzinach (powinno to zająć około 10 godzin) i niewiele się dzieje. Uważam, że moi Aktorzy nie wychodzą z wyjątków.Jak ustawić odporność na uszkodzenia akka Aktor?

Co muszę zrobić, aby ponownie uruchomić Aktorów na stałe? Spodziewam się, że można to zrobić z tej dokumentacji http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

pracuję z Akka 1.1.3 i scala 2,9

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached 
import akka.dispatch.Dispatchers 
import akka.routing.CyclicIterator 
import akka.routing.LoadBalancer 
import akka.config.Supervision._ 


object TestActor { 
    val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") 
        .setCorePoolSize(100) 
        .setMaxPoolSize(100) 
        .build 
} 

class TestActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.dispatcher = TestActor.dispatcher 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure") 
     println("Actor: " + name + " Received: " + num) 
     //Thread.sleep(100) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    //callback method for restart handling 
    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    //callback method for restart handling 
    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

trait CyclicLoadBalancing extends LoadBalancer { this: Actor => 
    val testActors: List[ActorRef] 
    val seq = new CyclicIterator[ActorRef](testActors) 
} 

trait TestActorManager extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) 
    val testActors: List[ActorRef] 
    override def preStart = testActors foreach { self.startLink(_) } 
    override def postStop = { System.out.println("postStop") } 
} 


    object FaultTest { 
    def main(args : Array[String]) : Unit = { 
     println("starting FaultTest.main()") 
     val numOfActors = 5 
     val supervisor = actorOf(
     new TestActorManager with CyclicLoadBalancing { 
      val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); 
     } 
    ) 

     supervisor.start(); 

     println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) 

     val testActor = Actor.registry.actorsFor(classOf[TestActor]).head 

     (1 until 200 toList) foreach { testActor ! _ } 

    } 
    } 

Ten kod konfiguruje 5 Aktorzy za LoadBalancer że po prostu wydrukować liczb całkowitych, które są wysłane do nich, z wyjątkiem tego, że rzucają wyjątki na parzyste liczby, aby symulować błędy. Liczby całkowite od 0 do 200 są wysyłane do tych podmiotów. Spodziewam się, że liczby nieparzyste uzyskają wynik, ale wszystko wydaje się wyłączać po kilku błędach na liczbach parzystych. Uruchomienie tego kodu z wyników SBT w tym wyjściu:

[info] Running FaultTest 
starting FaultTest.main() 
Loading config [akka.conf] from the application classpath. 
Number of Actors: 5 
Actor: 2 Received: 1 
Actor: 2 Received: 9 
Actor: 1 Received: 3 
Actor: 3 Received: 7 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM 

Co myślę tu się dzieje jest to, że zaczynają 5 aktorów, a pierwsze 5 parzyste je z rynku i nie są one coraz wznowiona.

W jaki sposób można zmienić ten kod, aby aktorzy odzyskiwali ważność od wyjątków?

Spodziewam się, że wydrukowałoby to wszystkie liczby nieparzyste od 1 do 200. Myślę, że każdy aktor zawodziłby na liczbach parzystych, ale zostałby ponownie uruchomiony z nienaruszoną skrzynką na wyjątki. Spodziewam się zobaczyć println z preRestart i postRestart. Co należy skonfigurować w tym przykładzie kodu, aby te rzeczy się zdarzyły?

Oto kilka dodatkowych założeń dotyczących akka i Aktorów, które mogą prowadzić do mojego nieporozumienia. Zakładam, że Aktor może zostać skonfigurowany z Supervisor lub faultHandler, tak że będzie on ponownie uruchomiony i będzie dostępny, gdy wyjątek zostanie zgłoszony podczas odbierania. Zakładam, że wiadomość wysłana do aktora zostanie utracona, jeśli zgłasza wyjątek podczas odbierania. Zakładam, że zostaną wywołane preRestart() i postRestart() na aktorze, który zgłasza wyjątek.

Przykładowy kod reprezentuje to, co staram się robić i jest oparta na Why is my Dispatching on Actors scaled down in Akka?

** Kolejny przykładowy kod **

Oto kolejny przykładowy kod, który jest bardziej proste. Zaczynam od jednego aktora, który rzuca wyjątki na liczby parzyste. Na drodze nie ma elementu równoważącego obciążenia ani innych rzeczy. Próbuję wydrukować informacje o aktorze. Czekam na zakończenie programu przez minutę po wysłaniu wiadomości do aktora i sprawdzeniu, co się dzieje.

Spodziewam się, że wydrukowałoby to liczby nieparzyste, ale wygląda na to, że Aktor siedzi z wiadomościami w swojej skrzynce pocztowej.

Czy ustawienie OneForOneStrategy jest nieprawidłowe? Czy muszę powiązać aktora z czymś? Czy ten rodzaj konfiguracji jest zasadniczo błędnie skierowany z mojej strony? Czy Dyspozytor musi być skonfigurowany z tolerancją na uszkodzenia w jakiś sposób? Czy mógłbym zepsuć wątki w Dispatcherze?

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.ActorRegistry 
import akka.config.Supervision._ 

class SingleActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure, where does this get logged?") 
     println("Actor: " + name + " Received: " + num) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

object TestSingleActor{ 

    def main(args : Array[String]) : Unit = { 
     println("starting TestSingleActor.main()") 

     val testActor = Actor.actorOf(new SingleActor(1)).start() 

     println("number of actors: " + registry.actors.size) 
     printAllActorsInfo 

     (1 until 20 toList) foreach { testActor ! _ } 

     for(i <- 1 until 120){ 
     Thread.sleep(500) 
     printAllActorsInfo 
     } 
    } 

    def printAllActorsInfo() ={ 
    registry.actors.foreach((a) => 
     println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " 
       .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) 
    } 
} 

Dostaję wyjście jak:

[info] Running TestSingleActor 
starting TestSingleActor.main() 
Loading config [akka.conf] from the application classpath. 
number of actors: 1 
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1 
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ... 

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM 

Odpowiedz

5

Problem polegał na tym, że byłam z plikiem akka.conf. Używałem pliku 1.1.3 akka.conf, z wyjątkiem linii, która skonfigurowała obsługę zdarzeń.

kopalnia (złamane jeden):

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

odniesienia 1.1.3 (ten, który działa):

event-handlers = ["akka.event.EventHandler$DefaultListener"] 

Z moich Event-ładowarki config liniowych restartuje Aktor nie zdarzają. Dzięki referencji 1.1.3 ponowne uruchomienie linii przebiega wspaniale.

Zrobiłem tę zmianę na podstawie instrukcji http://akka.io/docs/akka/1.1.3/general/slf4j.html

Tak, pozbywając się sugestie w tej stronie i wracając do odniesienia 1.1.3 akka.conf udało mi się dostać odporne na uszkodzenia Aktorów.

1

Wierzę problem kończy się po wiadomości są wysyłane, nie starają się utrzymać przy życiu asynchroniczny aplikacji, a więc głównych zjazdów z gwintem i zabiera wszystko ze sobą.

+0

Jeśli dodaję Trhead.sleep (100000) na końcu głównej(), otrzymam: '[info] Uruchamianie testu błędów począwszy FaultTest.main() Ładowanie config [akka.conf] ze ścieżki klasy aplikacji. Liczba aktorów: 5 Aktor: 0 Otrzymano: 1 Aktor: 4 Otrzymano: 3 Aktor: 1 Otrzymano: 7 Aktor: 1 Otrzymano: 9', a wynik zostaje wstrzymany, ale dodatkowe numery nie są drukowane. Nie czekałem, aż aplikacja wyjdzie, ale po 30-40 sekundach nic nie było. Ponadto, jeśli usunę błąd, cyfry będą drukować bardzo szybko, w mniej niż 2 sekundy. –

Powiązane problemy