2016-04-04 14 views
13

Vorrei utilizzare un SourceQueue per spostare gli elementi dinamicamente in una sorgente di Akka Stream. Il controller di riproduzione necessita di una sorgente per poter trasmettere un risultato utilizzando il metodo chuncked.
As Play utilizza il proprio Akka Stream Sink sotto il cofano, non posso materializzare la coda di origine me stesso utilizzando un sink perché la sorgente sarebbe stata consumata prima che venisse usata dal metodo chunked (eccetto se usassi il seguente hack).Come utilizzare un flusso Akka SourceQueue con PlayFramework

sono in grado di farlo funzionare se pre-materializzare l'coda di origine utilizzando un editore reattivo-corsi d'acqua, ma è una sorta di 'trucco sporco':

def sourceQueueAction = Action{ 

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run() 

    //stupid example to push elements dynamically 
    val tick = Source.tick(0 second, 1 second, "tick") 
    tick.runForeach(t => queue.offer(t)) 

    Ok.chunked(Source.fromPublisher(pub)) 
    } 

C'è un modo più semplice per usare un Akka Streams SourceQueue con PlayFramework?

Grazie

risposta

19

La soluzione è quella di utilizzare mapMaterializedValue sulla sorgente per ottenere un futuro di sua materializzazione coda:

def sourceQueueAction = Action { 
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail)) 

    futureQueue.map { queue => 
     Source.tick(0.second, 1.second, "tick") 
     .runForeach (t => queue.offer(t)) 
    } 
    Ok.chunked(queueSource) 

    } 

    //T is the source type, here String 
    //M is the materialization type, here a SourceQueue[String] 
    def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = { 
    val p = Promise[M] 
    val s = src.mapMaterializedValue { m => 
     p.trySuccess(m) 
     m 
    } 
    (s, p.future) 
    } 
+0

Perché se lo faccio 'queueSource.map {} _.toUpperCase' per esempio io non ottieni una fonte [String, NotUsed]? Invece restituisce l'errore 'Expression di tipo queueSource.Repr [String] non conforme al tipo previsto Source [String, NotUsed] .' Dove eseguirai le trasformazioni agli elementi della sorgente? Come le zecche in [il tuo esempio] (http://loicdescotte.github.io/posts/play-akka-streams-queue/) – gabrielgiussi

Problemi correlati