2013-03-21 11 views
10

C'è un modo per trasformare un Seq [Futuro [X]] in un Enumerator[X]? Il caso d'uso è che voglio ottenere risorse eseguendo la scansione del Web. Questo restituirà una Sequenza di Futures, e mi piacerebbe restituire un Enumeratore che spingerà i futuri nell'ordine in cui sono finiti per la prima volta sull'Iteratee.trasformare un Seq [Futuro [X]] in un Enumeratore [X]

Sembra che lo standard Future select gist di Victor Klang possa essere utilizzato per farlo, anche se sembra piuttosto inefficiente.

Nota: I Iteratees e Enumeratore di in questione sono quelle indicate dalla versione 2.x quadro di gioco, vale a dire con le seguenti importazioni: import play.api.libs.iteratee._

risposta

2

A, migliore risposta più breve e penso che più efficiente è:

 
    def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
     def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = { 
     Future.sequence(seqFutureX).flatMap { seqX: Seq[X] => 
      seqX.foldLeft(Future.successful(i)) { 
       case (i, x) => i.flatMap(_.feed(Input.El(x))) 
      } 
     } 
     } 
    } 

+0

Ho modificato per utilizzare foldLeft che è più veloce di foldRight. –

+2

Attento, 'Future.sequence' restituisce un futuro fallito se uno dei futures' seqFutureX' fallisce. –

+0

e questa soluzione attende che tutti i futuri vengano completati prima di somministrare Iteratee, mentre un altro da @bblfish lo inserisce il prima possibile (senza ordine conservato!). – viktortnk

3

Utilizzando Victor Klang's select method:

 
    /** 
    * "Select" off the first future to be satisfied. Return this as a 
    * result, with the remainder of the Futures as a sequence. 
    * 
    * @param fs a scala.collection.Seq 
    */ 
    def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): 
     Future[(Try[A], Seq[Future[A]])] = { 
    @scala.annotation.tailrec 
    def stripe(p: Promise[(Try[A], Seq[Future[A]])], 
       heads: Seq[Future[A]], 
       elem: Future[A], 
       tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = { 
     elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) } 
     if (tail.isEmpty) p.future 
     else stripe(p, heads :+ elem, tail.head, tail.tail) 
    } 
    if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!")) 
    else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail) 
    } 
} 

posso quindi ottenere quello che ho necessario con

 
    Enumerator.unfoldM(initialSeqOfFutureAs){ seqOfFutureAs => 
     if (seqOfFutureAs.isEmpty) { 
      Future(None) 
     } else { 
      FutureUtil.select(seqOfFutureAs).map { 
      case (t, seqFuture) => t.toOption.map { 
       a => (seqFuture, a) 
      } 
      } 
     } 
    } 

+0

Sono un po 'preoccupato che usare l'implementazione selezionata di Victor Klang non è abbastanza efficiente. In questo algoritmo dobbiamo attraversare l'intera sequenza, che richiede che ogni Futuro sia registrato con una nuova Promessa su ogni passaggio. Dovrebbe essere possibile creare un algoritmo dove è necessario farlo una volta sola ... Forse si tratta solo di sottoclasse di Enumerator e registrazione di ogni futuro nella sequenza con l'enumeratore. –

0

È possibile costruirne uno utilizzando il servizio di esecuzione di Java Executor (JavaDoc). L'idea è di creare una sequenza di nuovi futures, ognuno utilizzando ExecutorCompletionService.take() per attendere il risultato successivo. Ogni futuro inizierà, quando il futuro precedente avrà il suo risultato.

Ma per favore sappiate che questo potrebbe non essere così efficiente, perché molta sincronizzazione sta accadendo dietro le quinte. Potrebbe essere più efficiente utilizzare una riduzione della mappa parallela per il calcolo (ad esempio utilizzando ParSeq di Scala) e lasciare che l'Enumeratore attenda il risultato completo.

+0

"ogni futuro inizierà quando il futuro precedente ha un risultato": sembra che stia bloccando. Nel codice fornito nella mia risposta sopra tutti i futures in 'seqOfFuturesA' sono eseguiti in parallelo. –

0

ATTENZIONE: Non compilato prima di rispondere

Che dire qualcosa di simile:

def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
    def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = 
    Future.fold(seqFutureX)(i){ case (i, x) => i.flatMap(_.feed(Input.El(x)))) } 
} 
+0

La firma della piega è '' 'def fold [T, R] (futures: scala.TraversableOnce [Future [T]]) (zero: R) (foldFun: (R, T) => R) (implicito executor: ExecutionContext): Future [R] '' ' ma il tuo codice ha la firma ' '' piega [T, R] (futures: Seq [Futuro [T]]) (zero: Iteratee [T, R ]) (foldFun: (R, T) => Future [R]) (esecutore implicito: ExecutionContext): Future [R] '' ' c'è un problema con foldfun, perché' i.flatMap (_. feed (Input .El (x)) 'restituisce un' Future [R] 'non un' R' –

+0

ma io sono di tipo "Iteratee [X, A]", e flatMap dovrebbe restituire un "Iteratee [X, A]", no ? (dato che il feed restituisce Iteratee [X, A]) –

+0

"Iteratee [E, A]" di Play definisce "flatMap" come: "def flatMap [B] (f: A => Iteratee [E, B]): Iteratee [E, B] 'in modo che il tuo caso debba essere realmente scritto come' case (i, x) => i.flatMap (a => ...) '. E quindi 'a' non è più un' Iteratee', e quindi non ha un metodo 'feed'. Se invece si tenta di fare 'case (i, x) => i feed (Input.El (x))' allora si finisce con un 'Future [Iteratee [...]]' che non è cosa vuole la piega La cosa bella è che senza il sistema di tipo Scala non penso che avrei mai trovato la risposta ... :-) –

0

Qui è qualcosa che ho trovato a portata di mano,

def unfold[A,B](xs:Seq[A])(proc:A => Future[B])(implicit errorHandler:Throwable => B):Enumerator[B] = { 
    Enumerator.unfoldM (xs) { xs => 
     if (xs.isEmpty) Future(None) 
     else proc(xs.head) map (b => Some(xs.tail,b)) recover { 
      case e => Some((xs.tail,errorHandler(e))) 
     } 
    } 
} 

def unfold[A,B](fxs:Future[Seq[A]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = { 

    (unfold(Seq(fxs))(fxs => fxs)(errorHandler1)).flatMap(unfold(_)(proc)(errorHandler)) 
} 

def unfoldFutures[A,B](xsfxs:Seq[Future[Seq[A]]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = { 

    xsfxs.map(unfold(_)(proc)).reduceLeft((a,b) => a.andThen(b)) 
} 
1

Mi rendo conto che la questione è un po 'già vecchio, ma in base alla risposta di Santhosh e il built-in Enumterator.enumerate() implementazione mi si avvicinò con il seguente:

def enumerateM[E](traversable: TraversableOnce[Future[E]])(implicit ec: ExecutionContext): Enumerator[E] = { 
    val it = traversable.toIterator 
    Enumerator.generateM { 
    if (it.hasNext) { 
     val next: Future[E] = it.next() 
     next map { 
     e => Some(e) 
     } 
    } else { 
     Future.successful[Option[E]] { 
     None 
     } 
    } 
    } 
} 

Si noti che a differenza del primo Viktor- select-based-solution questo conserva l'ordine, ma è ancora possibile avviare tutti i calcoli in modo asincrono prima. Così, per esempio, è possibile effettuare le seguenti operazioni:

// For lack of a better name 
def mapEachM[E, NE](eventuallyList: Future[List[E]])(f: E => Future[NE])(implicit ec: ExecutionContext): Enumerator[NE] = 
    Enumerator.flatten(
    eventuallyList map { list => 
     enumerateM(list map f) 
    } 
) 

Quest'ultimo metodo era in realtà quello che cercavo quando mi sono imbattuto in questo thread. Spero che aiuti qualcuno! :)

0

Vorrei proporre l'uso di una trasmissione

def seqToEnumerator[A](futuresA: Seq[Future[A]])(defaultValue: A, errorHandler: Throwable => A): Enumerator[A] ={ 
    val (enumerator, channel) = Concurrent.broadcast[A] 
    futuresA.foreach(f => f.onComplete({ 
     case Success(Some(a: A)) => channel.push(a) 
     case Success(None) => channel.push(defaultValue) 
     case Failure(exception) => channel.push(errorHandler(exception)) 
    })) 
    enumerator 
    } 

ho aggiunto ErrorHandling e defaultValues ​​ma si può saltare quelli utilizzando onSuccess o onFailure, invece di onComplete

Problemi correlati