2013-03-09 8 views
12

Desidero creare un enumerato di riproduzione 2 che acquisisca valori e li emetta, suddivisi in blocchi, ogni x secondi/millisecondi. In questo modo, in un ambiente Websocket multiutente con un sacco di input dell'utente, si potrebbe limitare il numero di frame ricevuti al secondo.Creazione di un chunking basato sul tempo Enumerato

So che è possibile raggruppare un certo numero di elementi insieme in questo modo:

val chunker = Enumeratee.grouped(
    Traversable.take[Array[Double]](5000) &>> Iteratee.consume() 
) 

Esiste un modo integrato per effettuare questa operazione in base al tempo piuttosto che in base al numero di articoli?

Stavo pensando di farlo in qualche modo con un lavoro Akka programmato, ma a prima vista questo sembra inefficiente, e non sono sicuro che si verifichino problemi di concurenza.

+0

questa è una domanda negativa. – tailor

+1

@tailor Ti piacerebbe elaborare perché pensi che sia male o come migliorarlo? – Carsten

risposta

3

Che ne dici di questo? Spero che questo sia utile per te.

package controllers 

import play.api._ 
import play.api.Play.current 
import play.api.mvc._ 
import play.api.libs.iteratee._ 
import play.api.libs.concurrent.Akka 
import play.api.libs.concurrent.Promise 

object Application extends Controller { 

    def index = Action { 
    val queue = new scala.collection.mutable.Queue[String] 
    Akka.future { 
     while(true){ 
     Logger.info("hogehogehoge") 
     queue += System.currentTimeMillis.toString 
     Thread.sleep(100) 
     } 
    } 
    val timeStream = Enumerator.fromCallback {() => 
     Promise.timeout(Some(queue), 200) 
    } 
    Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue => 
     var str = "" 
     while(queue.nonEmpty){ 
     str += queue.dequeue + ", " 
     } 
     str 
    }))) 
    } 

} 

E questo documento è anche utile per voi. http://www.playframework.com/documentation/2.0/Enumerators

UPDATE Questo è per la versione play2.1.

package controllers 

import play.api._ 
import play.api.Play.current 
import play.api.mvc._ 
import play.api.libs.iteratee._ 
import play.api.libs.concurrent.Akka 
import play.api.libs.concurrent.Promise 
import scala.concurrent._ 
import ExecutionContext.Implicits.global 

object Application extends Controller { 

    def index = Action { 
    val queue = new scala.collection.mutable.Queue[String] 
    Akka.future { 
     while(true){ 
     Logger.info("hogehogehoge") 
     queue += System.currentTimeMillis.toString 
     Thread.sleep(100) 
     } 
    } 
    val timeStream = Enumerator.repeatM{ 
     Promise.timeout(queue, 200) 
    } 
    Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue => 
     var str = "" 
     while(queue.nonEmpty){ 
     str += queue.dequeue + ", " 
     } 
     str 
    }))) 
    } 

} 
+1

Grazie per la risposta! Sfortunatamente, 'Enumerator.fromCallback' è diventato obsoleto a partire dalla versione 2.1 (e si stanno allontanando da' Promise' di Play e verso 'Future' di Scala). Forse conosci il modo "aggiornato" per farlo? – Carsten

+0

Grazie per avermi insegnato. Non sapevo che 'Enumerator.fromCallback' ha una bee deprecata. Ho letto [qui] (https://github.com/playframework/Play20/blob/master/framework/src/iteratees/src/main/scala/play/api/libs/iteratee/Enumerator.scala) e ho aggiornato il codice . – buster84

2

Qui ho rapidamente definito un iteratee che avrà valori da un ingresso per una lunghezza t tempo fisso misurata in millisecondi e un enumeratee che permetterà di gruppo e ulteriore processo un flusso di input diviso in segmenti costruiti entro tale lunghezza t. Si affida a JodaTime per tenere traccia di quanto tempo è trascorso da quando è iniziata l'iteratee.

def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = { 
    var startTime = new Instant() 

    def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = { 
    val timePassed = new Interval(startTime, new Instant()).toDurationMillis 

    input match { 
     case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) } 
     case Input.Empty => Cont[E, List[E]](i => step(state)(i)) 
     case Input.El(e) => 
     if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) } 
     else Cont[E, List[E]](i => step(e::state)(i)) 
    } 
    } 

    Cont(step(List[E]())) 
} 

def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis)) 
+0

Grazie. Il tuo metodo è elegante. Ho accettato l'altra risposta perché utilizza meno tempo della CPU (almeno la metà), spero che vada bene. Inoltre, a partire da Scala 2.10, è possibile utilizzare la classe 'Deadline' di Scala per aggiornare il timer. – Carsten

Problemi correlati