2014-12-21 11 views
7

Ho giocato con Akka Persistence e ho scritto il seguente programma per testare la mia comprensione. Il problema è che ottengo risultati diversi ogni volta che eseguo questo programma. La risposta corretta è 49995000 ma non sempre lo capisco. Ho pulito la directory del journal tra ogni esecuzione ma non fa alcuna differenza. Qualcuno può vedere cosa sta andando storto? Il programma semplicemente somma tutti i numeri da 1 a n (dove n è 9999 nel codice qui sotto).La persistenza di Akka con consegna confermata dà risultati incoerenti

La risposta corretta è: (n * (n + 1))/2. Per n = 9999 che è 49995000.

EDIT: Sembra di lavorare in modo più coerente con JDK 8 che con JDK 7. Dovrei essere usando solo JDK 8?

package io.github.ourkid.akka.aggregator.guaranteed 

import akka.actor.Actor 
import akka.actor.ActorPath 
import akka.actor.ActorSystem 
import akka.actor.Props 
import akka.actor.actorRef2Scala 
import akka.persistence.AtLeastOnceDelivery 
import akka.persistence.PersistentActor 

case class ExternalRequest(updateAmount : Int) 
case class CountCommand(deliveryId : Long, updateAmount : Int) 
case class Confirm(deliveryId : Long) 

sealed trait Evt 
case class CountEvent(updateAmount : Int) extends Evt 
case class ConfirmEvent(deliveryId : Long) extends Evt 

class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery { 

    override def persistenceId = "persistent-actor-ref-1" 

    override def receiveCommand : Receive = { 
    case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState) 
    case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState) 
    } 

    override def receiveRecover : Receive = { 
    case evt : Evt => updateState(evt) 
    } 

    def updateState(evt:Evt) = evt match { 
    case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount)) 
    case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId) 
    } 
} 

class FactorialActor extends Actor { 
    var count = 0 
    def receive = { 
    case CountCommand(deliveryId : Long, updateAmount:Int) => { 
     count = count + updateAmount 
     sender() ! Confirm(deliveryId) 
    } 
    case "print" => println(count) 
    } 
} 

object GuaranteedDeliveryTest extends App { 
    val system = ActorSystem() 

    val factorial = system.actorOf(Props[FactorialActor]) 

    val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path)) 

    import system.dispatcher 

    system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" } 

    for (i <- 1 to 9999) 
    delActor ! ExternalRequest(i) 



} 

file di SBT

name := "akka_aggregator" 

organization := "io.github.ourkid" 

version := "0.0.1-SNAPSHOT" 

scalaVersion := "2.11.4" 

scalacOptions ++= Seq("-unchecked", "-deprecation") 

resolvers ++= Seq(
    "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" 
) 

val Akka = "2.3.7" 
val Spray = "1.3.2" 

libraryDependencies ++= Seq(
    // Core Akka 
    "com.typesafe.akka" %% "akka-actor" % Akka, 
    "com.typesafe.akka" %% "akka-cluster" % Akka, 
    "com.typesafe.akka" %% "akka-persistence-experimental" % Akka, 
    "org.iq80.leveldb" % "leveldb" % "0.7", 
    "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", 

    // For future REST API 
    "io.spray" %% "spray-httpx" % Spray, 
    "io.spray" %% "spray-can" % Spray, 
    "io.spray" %% "spray-routing" % Spray, 
    "org.typelevel" %% "scodec-core" % "1.3.0", 

    // CSV reader  
    "net.sf.opencsv" % "opencsv" % "2.3", 

    // Logging 
    "com.typesafe.akka" %% "akka-slf4j" % Akka, 
    "ch.qos.logback" % "logback-classic" % "1.0.13", 

    // Testing 
    "org.scalatest" %% "scalatest" % "2.2.1" % "test", 
    "com.typesafe.akka" %% "akka-testkit" % Akka % "test", 
    "io.spray" %% "spray-testkit" % Spray % "test", 
    "org.scalacheck" %% "scalacheck" % "1.11.6" % "test" 
) 
fork := true 
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor") 

file di application.conf

########################################## 
# Akka Persistence Reference Config File # 
########################################## 

akka { 

    # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs 
    # to STDOUT) 
    loggers = ["akka.event.slf4j.Slf4jLogger"] 

    # Log level used by the configured loggers (see "loggers") as soon 
    # as they have been started; before that, see "stdout-loglevel" 
    # Options: OFF, ERROR, WARNING, INFO, DEBUG 
    loglevel = "DEBUG" 

    # Log level for the very basic logger activated during ActorSystem startup. 
    # This logger prints the log messages to stdout (System.out). 
    # Options: OFF, ERROR, WARNING, INFO, DEBUG 
    stdout-loglevel = "INFO" 

    # Filter of log events that is used by the LoggingAdapter before 
    # publishing log events to the eventStream. 
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 

    # Protobuf serialization for persistent messages 
    actor { 

    serializers { 

     akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" 
     akka-persistence-message = "akka.persistence.serialization.MessageSerializer" 
    } 

    serialization-bindings { 

     "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot 
     "akka.persistence.serialization.Message" = akka-persistence-message 
    } 
    } 

    persistence { 

    journal { 

     # Maximum size of a persistent message batch written to the journal. 
     max-message-batch-size = 200 

     # Maximum size of a deletion batch written to the journal. 
     max-deletion-batch-size = 10000 

     # Path to the journal plugin to be used 
     plugin = "akka.persistence.journal.leveldb" 

     # In-memory journal plugin. 
     inmem { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.inmem.InmemJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.actor.default-dispatcher" 
     } 

     # LevelDB journal plugin. 
     leveldb { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.leveldb.LeveldbJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

     # Dispatcher for message replay. 
     replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" 

     # Storage location of LevelDB files. 
     dir = "journal" 

     # Use fsync on write 
     fsync = on 

     # Verify checksum on read. 
     checksum = off 

     # Native LevelDB (via JNI) or LevelDB Java port 
     native = on 
     # native = off 
     } 

     # Shared LevelDB journal plugin (for testing only). 
     leveldb-shared { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.actor.default-dispatcher" 

     # timeout for async journal operations 
     timeout = 10s 

     store { 

      # Dispatcher for shared store actor. 
      store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

      # Dispatcher for message replay. 
      replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

      # Storage location of LevelDB files. 
      dir = "journal" 

      # Use fsync on write 
      fsync = on 

      # Verify checksum on read. 
      checksum = off 

      # Native LevelDB (via JNI) or LevelDB Java port 
      native = on 
     } 
     } 
    } 

    snapshot-store { 

     # Path to the snapshot store plugin to be used 
     plugin = "akka.persistence.snapshot-store.local" 

     # Local filesystem snapshot store plugin. 
     local { 

     # Class name of the plugin. 
     class = "akka.persistence.snapshot.local.LocalSnapshotStore" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

     # Dispatcher for streaming snapshot IO. 
     stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" 

     # Storage location of snapshot files. 
     dir = "snapshots" 
     } 
    } 

    view { 

     # Automated incremental view update. 
     auto-update = on 

     # Interval between incremental updates 
     auto-update-interval = 5s 

     # Maximum number of messages to replay per incremental view update. Set to 
     # -1 for no upper limit. 
     auto-update-replay-max = -1 
    } 

    at-least-once-delivery { 
     # Interval between redelivery attempts 
     redeliver-interval = 5s 

     # Maximum number of unconfirmed messages that will be sent in one redelivery burst 
     redelivery-burst-limit = 10000 

     # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` 
     # message will be sent to the actor. 
     warn-after-number-of-unconfirmed-attempts = 5 

     # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is 
     # allowed to hold in memory. 
     max-unconfirmed-messages = 100000 
    } 

    dispatchers { 
     default-plugin-dispatcher { 
     type = PinnedDispatcher 
     executor = "thread-pool-executor" 
     } 
     default-replay-dispatcher { 
     type = Dispatcher 
     executor = "fork-join-executor" 
     fork-join-executor { 
      parallelism-min = 2 
      parallelism-max = 8 
     } 
     } 
     default-stream-dispatcher { 
     type = Dispatcher 
     executor = "fork-join-executor" 
     fork-join-executor { 
      parallelism-min = 2 
      parallelism-max = 8 
     } 
     } 
    } 
    } 
} 

uscita corretta:

18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 
0 
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
3974790 
24064453 
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
49995000 
49995000 
49995000 
49995000 

non corretta esecuzione:

012.351.
17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started 
0 
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
3727815 
22167811 
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
49995000 
51084018 
51084018 
52316760 
52316760 
52316760 
52316760 
52316760 

un'altra corsa non corretta:

17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 
0 
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
2982903 
17710176 
49347145 
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
51704199 
51704199 
55107844 
55107844 
55107844 
55107844 

risposta

10

State usando AtLeastOnceDelivery semantica. Come ha detto here:

Nota A-almeno-una volta la consegna implica che il messaggio originale ordine inviare non è sempre conservato e la destinazione può ricevere duplicati messaggi. Ciò significa che la semantica non corrispondono a quelle di una normale ActorRef inviare funzionamento:

non è al massimo una volta al fine di recapito dei messaggi per la stessa coppia mittente-destinatario non è conservato a causa di possibili rinvii dopo un Arresto anomalo e riavvio dei messaggi di destinazione sono ancora consegnati a la nuova incarnazione degli attori Questa semantica è simile a quello che rappresenta un ActorPath (vedere Ciclo di vita dell'attore), pertanto è necessario fornire un percorso e non un riferimento quando si recapitano i messaggi. I messaggi vengono inviati al percorso con una selezione di attori.

Quindi alcuni numeri possono essere ricevuti più di una volta. Puoi semplicemente ignorare i numeri duplicati all'interno di FactorialActor o non usare questa semantica.

Problemi correlati