2016-05-10 12 views
8

Sto provando ad integrare un flusso basato su stream akka nella mia app Play 2.5. L'idea è che puoi eseguire lo streaming in una foto, quindi scrivere su disco come file raw, una versione con miniature e una versione con watermark.Come assemblare un sink di Akka Streams da più scritture di file?

sono riuscito a ottenere questo lavoro utilizzando un qualcosa di grafico come questo:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray}) 
            .map(_.result().toArray) 

def toByteArray = Flow[ByteString].map(b => b.toArray) 

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => 
    import GraphDSL.Implicits._ 
    val streamFan = builder.add(Broadcast[ByteString](3)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 
    val output = builder.add(Flow[ByteString].map(x => Success(Done))) 

    val rawFileSink = FileIO.toFile(file) 
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

    streamFan.out(0) ~> rawFileSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 
    streamFan.out(2) ~> output.in 

    byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink 
    byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink 

    FlowShape(streamFan.in, output.out) 
}) 

graph 

}

Poi ho cablare in al mio controller di gioco utilizzando un accumulatore come questo:

val sink = Sink.head[Try[Done]] 

val photoStorageParser = BodyParser { req => 
    Accumulator(sink).through(graph).map(Right.apply) 
} 

Il problema è che i miei due sink di file non sono stati completati e sto ottenendo zero dimensioni per entrambi i file elaborati, ma non quello grezzo. La mia teoria è che l'accumulatore è in attesa solo su uno degli output del mio fan, quindi quando lo stream di input è completo e il mio byteAccumulator sputa il file completo, al termine dell'elaborazione, il gioco ha il valore materializzato dall'output .

Quindi, le mie domande sono:
Sono sulla buona strada con questo per quanto riguarda il mio approccio? Qual è il comportamento previsto per l'esecuzione di un grafico come questo? Come posso riunire tutti i miei lavandini per formare un dissipatore finale?

+0

Penso anche che il motivo è che i flussi non vengono uniti dopo l'elaborazione. Hai provato 'Sink.combine' (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat

+0

Sì, ho dato a Sink.com una possibilità, ma questo unifica diversi sink per inviare _to_ come un fan out. Penso di essere alla ricerca di un fan, ma sembra che tu non possa farlo solo con le fonti! – Tompey

+0

Questo sembra essere un esempio simile: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Forse devi restituire un 'SinkShape' invece di un' FlowShape' per dichiarare che il tuo stream è finito? – devkat

risposta

7

Ok, dopo un po 'di aiuto (Andreas era sulla strada giusta), sono arrivato a questa soluzione, che fa il trucco:

val rawFileSink = FileIO.toFile(file) 
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) { 
    implicit builder => (rawSink, thumbSink, waterSink) => { 
    val streamFan = builder.add(Broadcast[ByteString](2)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 

    streamFan.out(0) ~> rawSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink 
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink 

    SinkShape(streamFan.in) 
    } 
}) 

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done))) 

Dopo di che è morto ogni chiamare questo dal gioco:

val photoStorageParser = BodyParser { req => 
    Accumulator(theSink).map(Right.apply) 
} 

def createImage(path: String) = Action(photoStorageParser) { req => 
    Created 
} 
+0

grazie amico, ho appena avuto un compito simile e non riuscivo a capire come aspettare tutti i Futures materializzati. La tua soluzione ha aiutato molto e funziona! –

+0

Ciao! Che ne dici di un numero variabile di lavandini per combinazione? – Alexander

Problemi correlati