Devo creare un akka.stream.scaladsl.Source[T, Unit]
da una raccolta di Future[T]
.In akka-stream come creare una fonte non ordinata da una raccolta futures
ad esempio avere una collezione di futures ritorno interi,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
come creare un
val source: Source[Int, Unit] = ???
da esso.
Non riesco a utilizzare il combinatore Future.sequence
, da allora aspetterei che ogni futuro si completasse prima di ottenere qualsiasi cosa dalla fonte. Voglio ottenere risultati in qualsiasi ordine non appena tutti i futuri si completano.
Capisco che l'Source
sia un'API puramente funzionale e non dovrebbe eseguire nulla prima di materializzarlo in qualche modo. Quindi, la mia idea è quella di utilizzare un Iterator
(che è pigro) per creare una fonte:
Source {() =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
Ma questa sarebbe una fonte di future, non di valori effettivi. Potrei anche bloccare su next
usando Await.result(future)
ma non sono sicuro quale thread del battistrada sarà bloccato. Anche questo chiamerà i futuri in sequenza, mentre ho bisogno di un'esecuzione parallela.
UPDATE 2: si è scoperto che c'era un modo molto più semplice per farlo (grazie a Viktor Klang):
Source(futures).mapAsync(1)(identity)
UPDATE: ecco quello che ho sulla base di @sschaef risposta:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
def run(actor: ActorRef): Unit = {
futures.foreach { future =>
future.onComplete {
case Success(value) =>
actor ! value
case Failure(NonFatal(t)) =>
actor ! Status.Failure(t) // to signal error
}
}
Future.sequence(futures).onSuccess { case _ =>
actor ! Status.Success(()) // to signal stream's end
}
}
Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}
// ScalaTest tests follow
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"futuresToSource" should "convert futures collection to akka-stream source" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future(3)
whenReady {
futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
} { results =>
results should contain theSameElementsAs Seq(1, 2, 3)
}
}
it should "fail on future failure" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future.failed(new RuntimeException("future failed"))
whenReady {
futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
} { t =>
t shouldBe a [RuntimeException]
t should have message "future failed"
}
}
Cosa succede se il mio futuro è di tipo 'Futuro [Fonte [T, Unità]]' - posso fare qualcosa di meglio di 'Source (futures) .mapAsyncUnordered (1) (identity) .flatten (FlattenStrategy.concat)'? Mi piacerebbe che l'appiattimento fosse ordinato e supportasse anche il livello di parallelismo. – Tvaroh
Attualmente sono (ogni volta che trovo un'ora o due) di lavorare su 'flatten (FlattenStrategy.merge)' che farebbe quello che vuoi. Nel frattempo puoi usare 'mapAsyncUnordered (par) (identity)' + un'implementazione FlexiMerge? –
Viktor, non ho guardato FlexiMerge, ci provò. Grazie. – Tvaroh