2013-04-28 16 views
14

Czy są jakieś dobre samouczki/objaśnienia dotyczące korzystania z magistrali zdarzeń w programie akka? Przeczytałem dokumentację Akka, ale trudno mi zrozumieć, jak używać autobusu zdarzeńPrzewodnik po magistrali zdarzeń Akka

+0

http://www.kotancode.com/2014/02/12/using-the-akka-event-bus/ – AnonGeek

+1

Nie jest konstruktywny? Gdzie jeszcze mogę znaleźć odpowiedzi na takie pytania? –

Odpowiedz

39

Nie jestem pewien, czy istnieją dobre tutoriale, ale mogę podać szybki przykład możliwy przypadek użytkownika, w którym użycie strumienia zdarzeń może być pomocne. Jednak na wysokim poziomie strumień zdarzeń jest dobrym mechanizmem spełniającym wymagania pub/sub, które może posiadać Twoja aplikacja. Załóżmy, że masz przypadek, w którym aktualizujesz saldo użytkownika w swoim systemie. Dostęp do salda jest często dostępny, więc zdecydowałeś się na buforowanie go dla lepszej wydajności. Po zaktualizowaniu salda należy również sprawdzić, czy użytkownik przekroczył próg z saldem, a jeśli tak, wyślij je e-mailem. Nie chcesz, aby spadek pamięci podręcznej lub kontrola wartości progowej wagi były bezpośrednio powiązane z aktualizacją aktualizacji głównej, ponieważ mogą być obciążone i spowalniają odpowiedź użytkownika. Można modelować ten konkretny zestaw wymagań tak:

//Message and event classes 
case class UpdateAccountBalance(userId:Long, amount:Long) 
case class BalanceUpdated(userId:Long) 

//Actor that performs account updates 
class AccountManager extends Actor{ 
    val dao = new AccountManagerDao 

    def receive = { 
    case UpdateAccountBalance(userId, amount) => 
     val res = for(result <- dao.updateBalance(userId, amount)) yield{ 
     context.system.eventStream.publish(BalanceUpdated(userId)) 
     result     
     } 

     sender ! res 
    } 
} 

//Actor that manages a cache of account balance data 
class AccountCacher extends Actor{ 
    val cache = new AccountCache 

    override def preStart = { 
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated]) 
    } 

    def receive = { 
    case BalanceUpdated(userId) => 
     cache.remove(userId) 
    } 
} 

//Actor that checks balance after an update to warn of low balance 
class LowBalanceChecker extends Actor{ 
    val dao = new LowBalanceDao 

    override def preStart = { 
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated]) 
    } 

    def receive = { 
    case BalanceUpdated(userId) => 
     for{ 
     balance <- dao.getBalance(userId) 
     theshold <- dao.getBalanceThreshold(userId) 
     if (balance < threshold) 
     }{ 
     sendBalanceEmail(userId, balance) 
     } 
    } 
} 

W tym przykładzie AccountCacher i LowBalanceChecker aktorów zarówno zapisać się do eventStream według typu klasy dla zdarzenia BalanceUpdated. Jeśli to wydarzenie zostanie opublikowane w strumieniu, zostanie odebrane przez obie instancje aktorów. Następnie, w aktualizacji AccountManager, po zaktualizowaniu salda zwiększa ono wydarzenie BalanceUpdated dla użytkownika. Kiedy to się dzieje, równolegle, wiadomość ta jest dostarczana do skrzynek pocztowych zarówno dla AccountCacher, jak i dla LowBalanceChecker, co powoduje usunięcie salda z pamięci podręcznej i sprawdzenie progu konta oraz ewentualnie wysłanie e-maila.

Teraz można było po prostu wprowadzić bezpośrednie wywołania tell (!) do bezpośredniej komunikacji z tymi dwoma innymi aktorami, ale można argumentować, że może to być zbyt ścisłe połączenie tych dwóch "efektów ubocznych" aktualizacji salda, a te rodzaje szczegółów niekoniecznie należą do AccountManager. Jeśli masz warunek, który może spowodować dodatkowe rzeczy (kontrole, aktualizacje, itp.), Które muszą wystąpić wyłącznie jako efekty uboczne (nie będące częścią samego podstawowego przepływu biznesowego), wtedy strumień zdarzeń może być dobrym sposobem oddzielić wydarzenie i kto może być zmuszony zareagować na to wydarzenie.

+0

Dzięki. Tylko kilka pytań: 1) Czy tworzysz autobus zdarzeń, subskrybując jeden? Jak można "zniszczyć" autobus zdarzeń? 2) Czy istnieje konkretny aktor odpowiedzialny za autobus zdarzeń? 3) Zauważam, że nie zadeklarowałeś klasyfikatora, czy istnieje domyślny klasyfikator, który jest następnie wybrany? – Tsume

+1

"ActorSystem" będzie już mieć utworzoną dla niego szynę zdarzeń; nie trzeba samemu tworzyć. Ponieważ "ActorSystem" tworzy magistralę, zakładam, że opiekun root jest odpowiedzialny za magistralę. Nie jestem pewien, co masz na myśli przez pytanie 3; czy możesz wyjaśnić trochę więcej? – cmbaxter

+1

Po prostu przeczytałem ponownie [doc] (http://doc.akka.io/docs/akka/2.1.2/scala/event-bus.html), wydaje mi się, że źle zrozumiałem moje własne pytanie. Chciałem powiedzieć, że aktorzy w powyższym przykładzie są subskrybowani, aby otrzymać określoną wiadomość (BalanceUpdated). Co zrobić, aby subskrybować aktora do tematu, w którym różne wiadomości mogą być wysyłane – Tsume

10

Istnieje EventBus, który istnieje dla każdego ActorSystem. Ta EventBus jest określana jako Event Stream i można ją uzyskać, wywołując system.eventStream.

System ActorSystem używa strumienia zdarzeń dla wielu rzeczy, w tym logging, wysyłając Dead Letters i Cluster Events.

Możesz również użyć strumienia zdarzeń do własnych wymagań publikowania/subskrybowania. Na przykład strumień zdarzeń może być przydatny podczas testowania. Subskrybuj Test Kit 's testActor do strumienia zdarzeń dla niektórych zdarzeń (np. Zdarzeń rejestrowania) i można je expect. Może to być szczególnie przydatne, gdy nie chcesz wysyłać wiadomości do innego aktora, gdy coś się wydarzy, ale nadal musisz oczekiwać zdarzenia w teście.

Należy zauważyć, że strumień wydarzeń działa tylko w jednym kodzie ActorSystem. Jeśli korzystasz z wydarzeń zdalnych opublikowanych w strumieniu, nie przechodź domyślnie do systemów zdalnych (możesz jednak samemu dodać to wsparcie).

Teoretycznie można utworzyć oddzielne EventBus, jeśli nie chcesz korzystać ze strumienia zdarzeń.

Lepsze dokumenty dla magistrali zdarzeń są przetwarzane dla Akka 2.2, więc sprawdź ponownie, gdy zakończy się this ticket.

Powiązane problemy