2014-12-21 18 views
7

Bawiłem się z Akka Persistence i napisałem następujący program, aby przetestować moje zrozumienie. Problem polega na tym, że za każdym razem, gdy uruchamiam ten program, uzyskuję różne wyniki. Prawidłowa odpowiedź to 49995000, ale nie zawsze tak rozumiem. Oczyściłem katalog dzienników między każdym przebiegiem, ale nie robi to żadnej różnicy. Czy ktoś może zobaczyć, co się dzieje? Program po prostu sumuje wszystkie liczby od 1 do n (gdzie n wynosi 9999 w poniższym kodzie).Trwałość Akka z potwierdzoną dostawą daje niespójne wyniki

Prawidłowa odpowiedź to: (n * (n + 1))/2. Dla n = 9999, który jest 49995000.

EDIT: wydaje się działać bardziej konsekwentnie niż z JDK 8 z JDK 7. Czy mogę być tylko przy użyciu JDK 8?

package io.github.ourkid.akka.aggregator.guaranteed 

import akka.actor.Actor 
import akka.actor.ActorPath 
import akka.actor.ActorSystem 
import akka.actor.Props 
import akka.actor.actorRef2Scala 
import akka.persistence.AtLeastOnceDelivery 
import akka.persistence.PersistentActor 

case class ExternalRequest(updateAmount : Int) 
case class CountCommand(deliveryId : Long, updateAmount : Int) 
case class Confirm(deliveryId : Long) 

sealed trait Evt 
case class CountEvent(updateAmount : Int) extends Evt 
case class ConfirmEvent(deliveryId : Long) extends Evt 

class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery { 

    override def persistenceId = "persistent-actor-ref-1" 

    override def receiveCommand : Receive = { 
    case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState) 
    case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState) 
    } 

    override def receiveRecover : Receive = { 
    case evt : Evt => updateState(evt) 
    } 

    def updateState(evt:Evt) = evt match { 
    case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount)) 
    case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId) 
    } 
} 

class FactorialActor extends Actor { 
    var count = 0 
    def receive = { 
    case CountCommand(deliveryId : Long, updateAmount:Int) => { 
     count = count + updateAmount 
     sender() ! Confirm(deliveryId) 
    } 
    case "print" => println(count) 
    } 
} 

object GuaranteedDeliveryTest extends App { 
    val system = ActorSystem() 

    val factorial = system.actorOf(Props[FactorialActor]) 

    val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path)) 

    import system.dispatcher 

    system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" } 

    for (i <- 1 to 9999) 
    delActor ! ExternalRequest(i) 



} 

plik SBT

name := "akka_aggregator" 

organization := "io.github.ourkid" 

version := "0.0.1-SNAPSHOT" 

scalaVersion := "2.11.4" 

scalacOptions ++= Seq("-unchecked", "-deprecation") 

resolvers ++= Seq(
    "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" 
) 

val Akka = "2.3.7" 
val Spray = "1.3.2" 

libraryDependencies ++= Seq(
    // Core Akka 
    "com.typesafe.akka" %% "akka-actor" % Akka, 
    "com.typesafe.akka" %% "akka-cluster" % Akka, 
    "com.typesafe.akka" %% "akka-persistence-experimental" % Akka, 
    "org.iq80.leveldb" % "leveldb" % "0.7", 
    "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", 

    // For future REST API 
    "io.spray" %% "spray-httpx" % Spray, 
    "io.spray" %% "spray-can" % Spray, 
    "io.spray" %% "spray-routing" % Spray, 
    "org.typelevel" %% "scodec-core" % "1.3.0", 

    // CSV reader  
    "net.sf.opencsv" % "opencsv" % "2.3", 

    // Logging 
    "com.typesafe.akka" %% "akka-slf4j" % Akka, 
    "ch.qos.logback" % "logback-classic" % "1.0.13", 

    // Testing 
    "org.scalatest" %% "scalatest" % "2.2.1" % "test", 
    "com.typesafe.akka" %% "akka-testkit" % Akka % "test", 
    "io.spray" %% "spray-testkit" % Spray % "test", 
    "org.scalacheck" %% "scalacheck" % "1.11.6" % "test" 
) 
fork := true 
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor") 

plik application.conf

########################################## 
# Akka Persistence Reference Config File # 
########################################## 

akka { 

    # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs 
    # to STDOUT) 
    loggers = ["akka.event.slf4j.Slf4jLogger"] 

    # Log level used by the configured loggers (see "loggers") as soon 
    # as they have been started; before that, see "stdout-loglevel" 
    # Options: OFF, ERROR, WARNING, INFO, DEBUG 
    loglevel = "DEBUG" 

    # Log level for the very basic logger activated during ActorSystem startup. 
    # This logger prints the log messages to stdout (System.out). 
    # Options: OFF, ERROR, WARNING, INFO, DEBUG 
    stdout-loglevel = "INFO" 

    # Filter of log events that is used by the LoggingAdapter before 
    # publishing log events to the eventStream. 
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 

    # Protobuf serialization for persistent messages 
    actor { 

    serializers { 

     akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" 
     akka-persistence-message = "akka.persistence.serialization.MessageSerializer" 
    } 

    serialization-bindings { 

     "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot 
     "akka.persistence.serialization.Message" = akka-persistence-message 
    } 
    } 

    persistence { 

    journal { 

     # Maximum size of a persistent message batch written to the journal. 
     max-message-batch-size = 200 

     # Maximum size of a deletion batch written to the journal. 
     max-deletion-batch-size = 10000 

     # Path to the journal plugin to be used 
     plugin = "akka.persistence.journal.leveldb" 

     # In-memory journal plugin. 
     inmem { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.inmem.InmemJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.actor.default-dispatcher" 
     } 

     # LevelDB journal plugin. 
     leveldb { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.leveldb.LeveldbJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

     # Dispatcher for message replay. 
     replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" 

     # Storage location of LevelDB files. 
     dir = "journal" 

     # Use fsync on write 
     fsync = on 

     # Verify checksum on read. 
     checksum = off 

     # Native LevelDB (via JNI) or LevelDB Java port 
     native = on 
     # native = off 
     } 

     # Shared LevelDB journal plugin (for testing only). 
     leveldb-shared { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.actor.default-dispatcher" 

     # timeout for async journal operations 
     timeout = 10s 

     store { 

      # Dispatcher for shared store actor. 
      store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

      # Dispatcher for message replay. 
      replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

      # Storage location of LevelDB files. 
      dir = "journal" 

      # Use fsync on write 
      fsync = on 

      # Verify checksum on read. 
      checksum = off 

      # Native LevelDB (via JNI) or LevelDB Java port 
      native = on 
     } 
     } 
    } 

    snapshot-store { 

     # Path to the snapshot store plugin to be used 
     plugin = "akka.persistence.snapshot-store.local" 

     # Local filesystem snapshot store plugin. 
     local { 

     # Class name of the plugin. 
     class = "akka.persistence.snapshot.local.LocalSnapshotStore" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

     # Dispatcher for streaming snapshot IO. 
     stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" 

     # Storage location of snapshot files. 
     dir = "snapshots" 
     } 
    } 

    view { 

     # Automated incremental view update. 
     auto-update = on 

     # Interval between incremental updates 
     auto-update-interval = 5s 

     # Maximum number of messages to replay per incremental view update. Set to 
     # -1 for no upper limit. 
     auto-update-replay-max = -1 
    } 

    at-least-once-delivery { 
     # Interval between redelivery attempts 
     redeliver-interval = 5s 

     # Maximum number of unconfirmed messages that will be sent in one redelivery burst 
     redelivery-burst-limit = 10000 

     # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` 
     # message will be sent to the actor. 
     warn-after-number-of-unconfirmed-attempts = 5 

     # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is 
     # allowed to hold in memory. 
     max-unconfirmed-messages = 100000 
    } 

    dispatchers { 
     default-plugin-dispatcher { 
     type = PinnedDispatcher 
     executor = "thread-pool-executor" 
     } 
     default-replay-dispatcher { 
     type = Dispatcher 
     executor = "fork-join-executor" 
     fork-join-executor { 
      parallelism-min = 2 
      parallelism-max = 8 
     } 
     } 
     default-stream-dispatcher { 
     type = Dispatcher 
     executor = "fork-join-executor" 
     fork-join-executor { 
      parallelism-min = 2 
      parallelism-max = 8 
     } 
     } 
    } 
    } 
} 

Prawidłowe wyjście:

18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 
0 
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
3974790 
24064453 
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
49995000 
49995000 
49995000 
49995000 

Nieprawidłowa run:

17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started 
0 
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
3727815 
22167811 
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
49995000 
51084018 
51084018 
52316760 
52316760 
52316760 
52316760 
52316760 

Innym nieprawidłowy przebieg:

17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 
0 
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
2982903 
17710176 
49347145 
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
51704199 
51704199 
55107844 
55107844 
55107844 
55107844 

Odpowiedz

10

Używasz AtLeastOnceDelivery semantykę. Jak to powiedział here:

uwaga-najmniej raz dostawy oznacza, że ​​oryginalna wiadomość wysłać zamówienie nie zawsze jest zachowana, a docelowy może otrzymywać duplikaty wiadomości. Oznacza to, że semantyka nie pasują do tych z normalnym ActorRef wysłać operacja:

nie jest ona w temperaturze najczęściej raz kolejność wiadomość dostawy dla tej samej pary nadawca-odbiornik nie jest zachowana ze względu na możliwość ponownych wysłań po awarie i ponowne uruchamianie komunikatów docelowych są nadal dostarczane - do nowe wcielenie aktora Ta semantyka jest podobna do reprezentowanej przez ActorPath (patrz Cykl życia aktora), dlatego podczas dostarczania wiadomości należy podać ścieżkę, a nie referencję. Wiadomości są wysyłane do ścieżki z wyborem aktora.

Niektóre liczby mogą być odbierane więcej niż jeden raz. Możesz po prostu zignorować duplikaty liczb wewnątrz FactorialActor lub nie używać tego semantycznego.

Powiązane problemy