2015-09-06 10 views
7

Devo creare un akka.stream.scaladsl.Source[T, Unit] da una raccolta di Future[T].In akka-stream come creare una fonte non ordinata da una raccolta futures

ad esempio avere una collezione di futures ritorno interi,

val f1: Future[Int] = ??? 
val f2: Future[Int] = ??? 
val fN: Future[Int] = ??? 
val futures = List(f1, f2, fN) 

come creare un

val source: Source[Int, Unit] = ??? 

da esso.

Non riesco a utilizzare il combinatore Future.sequence, da allora aspetterei che ogni futuro si completasse prima di ottenere qualsiasi cosa dalla fonte. Voglio ottenere risultati in qualsiasi ordine non appena tutti i futuri si completano.

Capisco che l'Source sia un'API puramente funzionale e non dovrebbe eseguire nulla prima di materializzarlo in qualche modo. Quindi, la mia idea è quella di utilizzare un Iterator (che è pigro) per creare una fonte:

Source {() => 
    new Iterator[Future[Int]] { 
    override def hasNext: Boolean = ??? 
    override def next(): Future[Int] = ??? 
    } 
} 

Ma questa sarebbe una fonte di future, non di valori effettivi. Potrei anche bloccare su next usando Await.result(future) ma non sono sicuro quale thread del battistrada sarà bloccato. Anche questo chiamerà i futuri in sequenza, mentre ho bisogno di un'esecuzione parallela.

UPDATE 2: si è scoperto che c'era un modo molto più semplice per farlo (grazie a Viktor Klang):

Source(futures).mapAsync(1)(identity) 

UPDATE: ecco quello che ho sulla base di @sschaef risposta:

def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = { 
    def run(actor: ActorRef): Unit = { 
    futures.foreach { future => 
     future.onComplete { 
     case Success(value) => 
      actor ! value 
     case Failure(NonFatal(t)) => 
      actor ! Status.Failure(t) // to signal error 
     } 
    } 

    Future.sequence(futures).onSuccess { case _ => 
     actor ! Status.Success(()) // to signal stream's end 
    } 
    } 

    Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run) 
} 

// ScalaTest tests follow 

import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

"futuresToSource" should "convert futures collection to akka-stream source" in { 
    val f1 = Future(1) 
    val f2 = Future(2) 
    val f3 = Future(3) 

    whenReady { 
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _) 
    } { results => 
    results should contain theSameElementsAs Seq(1, 2, 3) 
    } 
} 

it should "fail on future failure" in { 
    val f1 = Future(1) 
    val f2 = Future(2) 
    val f3 = Future.failed(new RuntimeException("future failed")) 

    whenReady { 
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed 
    } { t => 
    t shouldBe a [RuntimeException] 
    t should have message "future failed" 
    } 
} 

risposta

6

Creazione di una fonte di Futures e poi "appiattire" via mapAsync:

scala> Source(List(f1,f2,fN)).mapAsync(1)(identity) 
res0: akka.stream.scaladsl.Source[Int,Unit] = [email protected] 
+0

Cosa succede se il mio futuro è di tipo 'Futuro [Fonte [T, Unità]]' - posso fare qualcosa di meglio di 'Source (futures) .mapAsyncUnordered (1) (identity) .flatten (FlattenStrategy.concat)'? Mi piacerebbe che l'appiattimento fosse ordinato e supportasse anche il livello di parallelismo. – Tvaroh

+0

Attualmente sono (ogni volta che trovo un'ora o due) di lavorare su 'flatten (FlattenStrategy.merge)' che farebbe quello che vuoi. Nel frattempo puoi usare 'mapAsyncUnordered (par) (identity)' + un'implementazione FlexiMerge? –

+0

Viktor, non ho guardato FlexiMerge, ci provò. Grazie. – Tvaroh

5

uno dei modi più semplici per alimentare una sorgente è attraverso un attore:

import scala.concurrent.Future 
import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 

implicit val system = ActorSystem("MySystem") 

def run(actor: ActorRef): Unit = { 
    import system.dispatcher 
    Future { Thread.sleep(100); actor ! 1 } 
    Future { Thread.sleep(200); actor ! 2 } 
    Future { Thread.sleep(300); actor ! 3 } 
} 

val source = Source 
    .actorRef[Int](0, OverflowStrategy.fail) 
    .mapMaterializedValue(ref ⇒ run(ref)) 
implicit val m = ActorMaterializer() 

source runForeach { int ⇒ 
    println(s"received: $int") 
} 

L'attore viene creato tramite il metodo Source.actorRef e reso disponibile tramite il metodo mapMaterializedValue. run prende semplicemente l'attore e gli invia tutti i valori completati, a cui è possibile accedere tramite source. Nell'esempio sopra, i valori vengono inviati direttamente nel futuro, ma ciò può naturalmente essere fatto ovunque (ad esempio nella chiamata onComplete sul futuro).

+0

A proposito, perché il primo 'argomento actorRef' è' 0'? Importa? – Tvaroh

+0

Se il consumatore può prendere tutti gli elementi fuori dalla Sorgente, sicuramente non ha bisogno di una cache, quindi è 0. – sschaef

+0

Ho provato, ma zero non ha funzionato (stava lanciando un'eccezione). La taglia uguale alla dimensione della collezione dei futures ha funzionato bene. – Tvaroh

Problemi correlati