2013-01-02 17 views
6

È corretto affermare che gli attori non referenziati rimangono iscritti al flusso di eventi? Almeno, questo è ciò che ottengo sperimentando con Akka ...Implementare gli attori Eventbus con riferimenti deboli?

Sto cercando di implementare riferimenti deboli per gli attori in uno scenario EventBus. In questi casi gli ascoltatori/attori dell'evento vanno e vengono tipicamente. A differenza degli attori indipendenti che dovrebbero essere presenti tutto il tempo. L'annullamento esplicito esplicito, naturalmente, funziona. Ma non sono sempre in grado di percepire il momento giusto per farlo.

Akka fornisce in questo caso d'uso?

val as = ActorSystem.create("weak") 
var actor = as.actorOf(Props[ExceptionHandler]) 
as.eventStream.subscribe(actor,classOf[Exception]) 

// an event is published & received 
as.eventStream.publish(new KnownProblem) 

//session expires or whatever that makes the actor redundant 
actor = null 
(1 to 30).foreach(_ => System.gc) 

// an event is published & STILL received 
as.eventStream.publish(new KnownProblem) 
+0

'attore unsubscribe' da' 'EventStream' utilizzando classOf [Eccezione]' classificatore? – idonnie

+0

Che funzionerà. Se potessi sapere che l'attore non è più referenziato in alcun modo. :-) Sto provando questo scenario per una sessione http. Quando il server delle applicazioni termina la sessione, non ho occasione di eseguire l'annullamento dell'iscrizione. Di solito ciò avviene con riferimenti deboli. –

+0

EventBus è collegato a 'ActorSystem' con' def eventStream', inoltre alcuni eventi sono pubblicati su 'eventStream' durante la configurazione. Suggerisco di estendere "EventBus" con la sottoscrizione di "WeakReference [ActorRef]' -s, la caratteristica 'LookupClassification' sembra promettente. Una citazione di autore: 'Estendere EventBus e implement.' https://groups.google.com/forum/?fromgroups=#!topic/akka-user/T3-FONxoX8E Akka EventBus semplice esempio https: //gist.github.com/3163791 – idonnie

risposta

0

Ok, non sono riuscito a implementarlo, ma l'attore si sta fermando su GC. Utilizzando Scala 2.9.2 (REPL) + Akka 2.0.3.

La EventBus con WeakReference[ActorRef] non ha aiutato - perché in Akka anche voi avete un dungeon con ChildrenContainer (self.children), anche ci potrebbero essere Monitor abbonamenti a eventi del ciclo di vita. La cosa che non ho provato - creare attori con il dispatcher che conoscono solo il nostro nuovo splendente WeakEventBus - quindi forse ho perso il punto?

Qui va il codice per REPL (avviarlo con le importazioni appropriati e :paste in 2 fasi):

// Start REPL with something like: 
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar: 
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar: 
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar" 

// :paste 1/2 
import akka.actor._ 
import akka.pattern._ 
import akka.event._ 
import akka.util._ 
import com.typesafe.config.ConfigFactory 
import akka.util.Timeout 
import akka.dispatch.Await 
import scala.ref.WeakReference 
import java.util.Comparator 
import java.util.concurrent.atomic._ 
import java.util.UUID 

case class Message(val id:String,val timestamp: Long) 
case class PostMessage(
    override val id:String=UUID.randomUUID().toString(), 
    override val timestamp: Long=new java.util.Date().getTime(), 
    text:String) extends Message(id, timestamp) 
case class MessageEvent(val channel:String, val message:Message) 

case class StartServer(nodeName: String) 
case class ServerStarted(nodeName: String, actor: ActorRef) 
case class IsAlive(nodeName: String) 
case class IsAliveWeak(nodeName: String) 
case class AmAlive(nodeName: String, actor: ActorRef) 
case class GcCheck() 
case class GcCheckScheduled(isScheduled: Boolean, 
    gcFlag: WeakReference[AnyRef]) 

trait WeakLookupClassification { this: WeakEventBus ⇒ 
protected final val subscribers = new Index[Classifier, 
    WeakReference[Subscriber]](mapSize(), 
    new Comparator[WeakReference[Subscriber]] { 
      def compare(a: WeakReference[Subscriber], 
     b: WeakReference[Subscriber]): Int = { 
       if (a.get == None || b.get == None) -1 
       else compareSubscribers(a.get.get, b.get.get) 
     } 
     }) 
protected def mapSize(): Int 
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int 
protected def classify(event: Event): Classifier 
protected def publish(event: Event, subscriber: Subscriber): Unit 
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = 
    subscribers.put(to, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = 
    subscribers.remove(from, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber): Unit = 
    subscribers.removeValue(new WeakReference(subscriber)) 
def publish(event: Event): Unit = { 
     val i = subscribers.valueIterator(classify(event)) 
     while (i.hasNext) publish(event, i.next().get.get) 
} 
    } 

class WeakEventBus extends EventBus with WeakLookupClassification { 
    type Event = MessageEvent 
    type Classifier=String 
    type Subscriber = ActorRef 

    protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b 

    protected def mapSize(): Int = 10 
    protected def classify(event: Event): Classifier = event.channel 
    protected def publish(event: Event, subscriber: Subscriber): Unit = 
     subscriber ! event 
} 

lazy val weakEventBus = new WeakEventBus 

implicit val timeout = akka.util.Timeout(1000) 
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" 
akka { 
    loglevel = "DEBUG" 
    actor { 
     provider = "akka.remote.RemoteActorRefProvider" 
     debug { 
      receive = on 
      autoreceive = on   
      lifecycle = on 
      event-stream = on 
     } 
    } 
    remote { 
     transport = "akka.remote.netty.NettyRemoteTransport" 
     log-sent-messages = on 
     log-received-messages = on  
    } 
} 
serverconf { 
    include "common" 
    akka { 
     actor { 
      deployment { 
     /root { 
      remote = "akka://[email protected]:2552" 
     }  
      } 
     } 
     remote { 
      netty { 
     hostname = "127.0.0.1" 
     port = 2552 
      } 
     } 
    } 
} 
""").getConfig("serverconf")) 

class Server extends Actor { 
    private[this] val scheduled = new AtomicBoolean(false) 
    private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]() 

    val gcCheckPeriod = Duration(5000, "millis") 

    override def preRestart(reason: Throwable, message: Option[Any]) { 
     self ! GcCheckScheduled(scheduled.get, gcFlagRef.get) 
     super.preRestart(reason, message) 
    } 

    def schedule(period: Duration, who: ActorRef) = 
     actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) 

    def receive = {  
     case StartServer(nodeName) => 
      sender ! ServerStarted(nodeName, self) 
      if (scheduled.compareAndSet(false, true)) 
     schedule(gcCheckPeriod, self) 
      val gcFlagObj = new AnyRef()    
      gcFlagRef.set(new WeakReference(gcFlagObj)) 
      weakEventBus.subscribe(self, nodeName) 
      actorSystem.eventStream.unsubscribe(self)  
     case GcCheck => 
      val gcFlag = gcFlagRef.get 
      if (gcFlag == null) { 
     sys.error("gcFlag") 
      } 
     gcFlag.get match { 
     case Some(gcFlagObj) => 
      scheduled.set(true) 
      schedule(gcCheckPeriod, self) 
     case None => 
      println("Actor stopped because of GC: " + self) 
      context.stop(self)   
     } 
     case GcCheckScheduled(isScheduled, gcFlag) => 
      if (isScheduled && scheduled.compareAndSet(false, isScheduled)) { 
     gcFlagRef.compareAndSet(null, gcFlag) 
     schedule(gcCheckPeriod, self)    
      } 
     case IsAlive(nodeName) => 
      println("Im alive (default EventBus): " + nodeName) 
      sender ! AmAlive(nodeName, self) 
     case e: MessageEvent => 
      println("Im alive (weak EventBus): " + e)  
    } 
} 

// :paste 2/2 
class Root extends Actor { 
    def receive = { 
     case start @ StartServer(nodeName) => 
     val server = context.actorOf(Props[Server], nodeName) 
     context.watch(server) 
     Await.result(server ? start, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
     case started @ ServerStarted(nodeName, _) => 
      sender ! started 
     case _ => 
      throw new RuntimeException(
      "[S][FAIL] Could not start server: " + start) 
     } 
     case isAlive @ IsAlive(nodeName) => 
     Await.result(context.actorFor(nodeName) ? isAlive, 
     timeout.duration).asInstanceOf[AmAlive] match { 
     case AmAlive(nodeName, _) => 
      println("[S][SUCC] Server is alive : " + nodeName) 
     case _ => 
     throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)  
      } 
     case isAliveWeak @ IsAliveWeak(nodeName) =>     
     actorSystem.eventStream.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-default"))) 
     weakEventBus.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-weak"))) 
} 
    } 

lazy val rootActor = actorSystem.actorOf(Props[Root], "root") 

object Root { 
    def start(nodeName: String) = { 
     val msg = StartServer(nodeName) 
     var startedActor: Option[ActorRef] = None 
     Await.result(rootActor ? msg, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
      case succ @ ServerStarted(nodeName, actor) => 
      println("[S][SUCC] Server started: " + succ) 
      startedActor = Some(actor) 
      case _ => 
     throw new RuntimeException("[S][FAIL] Could not start server: " + msg) 
      } 
     startedActor 
    } 
    def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName) 
    def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName) 
} 

//////////////// 
// actual test 
Root.start("weak") 
Thread.sleep(7000L) 
System.gc() 
Root.isAlive("weak")