Sto provando ad integrare un flusso basato su stream akka nella mia app Play 2.5. L'idea è che puoi eseguire lo streaming in una foto, quindi scrivere su disco come file raw, una versione con miniature e una versione con watermark.Come assemblare un sink di Akka Streams da più scritture di file?
sono riuscito a ottenere questo lavoro utilizzando un qualcosa di grafico come questo:
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
Poi ho cablare in al mio controller di gioco utilizzando un accumulatore come questo:
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
Il problema è che i miei due sink di file non sono stati completati e sto ottenendo zero dimensioni per entrambi i file elaborati, ma non quello grezzo. La mia teoria è che l'accumulatore è in attesa solo su uno degli output del mio fan, quindi quando lo stream di input è completo e il mio byteAccumulator sputa il file completo, al termine dell'elaborazione, il gioco ha il valore materializzato dall'output .
Quindi, le mie domande sono:
Sono sulla buona strada con questo per quanto riguarda il mio approccio? Qual è il comportamento previsto per l'esecuzione di un grafico come questo? Come posso riunire tutti i miei lavandini per formare un dissipatore finale?
Penso anche che il motivo è che i flussi non vengono uniti dopo l'elaborazione. Hai provato 'Sink.combine' (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat
Sì, ho dato a Sink.com una possibilità, ma questo unifica diversi sink per inviare _to_ come un fan out. Penso di essere alla ricerca di un fan, ma sembra che tu non possa farlo solo con le fonti! – Tompey
Questo sembra essere un esempio simile: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Forse devi restituire un 'SinkShape' invece di un' FlowShape' per dichiarare che il tuo stream è finito? – devkat