2015-06-11 13 views
26

Sto provando a utilizzare il metodo Source.actorRef per creare un oggetto akka.stream.scaladsl.Source. Qualcosa della formaAccesso al ActorRef sottostante di un flusso akka Fonte creata da Source.actorRef

import akka.stream.OverflowStrategy.fail 
import akka.stream.scaladsl.Source 

case class Weather(zip : String, temp : Double, raining : Boolean) 

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) 

val sunnySource = weatherSource.filter(!_.raining) 
... 

La mia domanda è: Come posso inviare i dati al mio oggetto source basato ActorRef?

ho assunto l'invio di messaggi alla fonte era qualcosa della forma

//does not compile 
weatherSource ! Weather("90210", 72.0, false) 
weatherSource ! Weather("02139", 32.0, true) 

Ma weatherSource non dispone di un operatore o di !tell metodo.

Il documentation non è troppo descrittiva su come utilizzare Source.actorRef, semplicemente dice che si può ...

Grazie in anticipo per la tua opinione e la risposta.

risposta

22

Hai bisogno di un Flow:

import akka.stream.OverflowStrategy.fail 
    import akka.stream.scaladsl.Source 
    import akka.stream.scaladsl.{Sink, Flow} 

    case class Weather(zip : String, temp : Double, raining : Boolean) 

    val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) 

    val sunnySource = weatherSource.filter(!_.raining) 

    val ref = Flow[Weather] 
    .to(Sink.ignore) 
    .runWith(sunnySource) 

    ref ! Weather("02139", 32.0, true) 

Ricordate che questo è tutto sperimentale e può cambiare!

+0

In M5 sembra che Source.actorRef non esiste. Sai dove si è trasferito? –

+0

Sembra che in pratica abbiano cambiato questa impostazione per passare un oggetto alla fonte. La documentazione aggiornata è qui http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html – Noah

+0

1.0-RC3 è la versione più recente e 'Source .actorRef' vive ancora nello stesso posto lì: http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/#akka.stream.scaladsl.Source$ – jrudolph

5

Come @Noah sottolinea la natura sperimentale di akka-stream, la sua risposta potrebbe non funzionare con la versione 1.0. Ho dovuto seguire l'esempio dato da this example:

implicit val materializer = ActorMaterializer() 
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run() 
actorRef ! TweetInfo(...) 
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher) 
4

grado delle ActorRef, come tutti i 'materializzato valori', diventerà accessibile solo una volta intero flusso è materializzato, o, in altre parole, quando RunnableGraph è in esecuzione.

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph 
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println)) 

// You get ActorRef instance as a materialized value 
val actorRef1: ActorRef = rg1.run() 

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done: 

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple 
// (ActorRef, Future[Done]) when you run the graph 
val rg2: RunnableGraph[(ActorRef, Future[Done])] = 
    sunnySource.toMat(Sink.foreach(println))(Keep.both) 

// You get both ActorRef and Future[Done] instances as materialized values 
val (actorRef2, future) = rg2.run() 

actorRef2 ! Weather("90210", 72.0, false) 
actorRef2 ! Weather("02139", 32.0, true) 
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream 
future onComplete { /* ... */ } 
+0

Ottenere sia ActorRef che futuro a completamento - incredibile! Grazie! – AlonL

Problemi correlati