2015-01-05 8 views
8

Esistono esempi di codice dell'utilizzo delle librerie org.reactivestreams per elaborare grandi flussi di dati utilizzando Java NIO (per prestazioni elevate)? Sto puntando all'elaborazione distribuita, quindi gli esempi che usano Akka sarebbero i migliori, ma posso capirlo.Come utilizzare Reactive Stream per l'elaborazione binaria NIO?

sembra ancora essere il caso che la maggior parte (spero non tutti) gli esempi di leggere i file in località Scala di Source (non binario) o direttamente Java NIO (e anche cose come Files.readAllBytes!)

Forse c'è un modello di attivatore che ho perso? (Akka Streams with Scala! si avvicina a tutto ciò di cui ho bisogno eccetto il lato binario/NIO)

risposta

4

In realtà utilizziamo gli stream akka per elaborare file binari. E 'stato un po' difficile da far funzionare le cose in quanto non vi era alcuna documentazione intorno a questo, ma questo è ciò che siamo venuti su con:

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile)) 
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
val binSource = Source(binStream) 

Una volta che avete binSource, che è un Akka Source[Byte] si può andare avanti e inizia ad applicare qualsiasi trasformazione del flusso (map, flatMap, transform, ecc.). Questa funzionalità sfrutta lo apply dell'oggetto companion apply che accetta uno Iterable, passando in una scala Stream che dovrebbe leggere pigramente i dati e renderla disponibile alle proprie trasformazioni.

EDIT

Come Konrad sottolineato nella sezione commenti, un flusso può essere un problema con file di grandi dimensioni a causa del fatto che esso esegue memoization degli elementi che incontra come è pigramente costruendo il flusso. Questo può portare a situazioni di memoria insufficienti se non si presta attenzione. Tuttavia, se si guardano le docs per Stream c'è un suggerimento per evitare Memoizzazione costruire in memoria:

Bisogna essere prudenti di Memoizzazione; se non stai attento, puoi consumare molto rapidamente quantità di memoria grandi . La ragione di ciò è che la memoizzazione del flusso crea una struttura molto simile a scala.collection.immutable.List. Fintanto che qualcosa sta trattenendo su la testa, la testa tiene la coda, e così continua in modo ricorsivo. Se, d'altro canto, non c'è nulla che si aggrappa alla testina (ad esempio, abbiamo usato def per definire il flusso) e una volta che non è più utilizzato direttamente, lo stesso scompare.

Quindi tenendo conto di, si potrebbe modificare il mio esempio originale come segue:

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile))  
val binSource = Source(() => binStream(inputStream).iterator) 

def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte) 

Quindi l'idea è quello di costruire il Stream tramite un def e non assegnare a un val e poi subito a ottenere il iterator da esso e usarlo per inizializzare l'Akka Source. Impostare le cose in questo modo dovrebbe evitare i problemi con la momoizzazione. Ho eseguito il vecchio codice su un file di grandi dimensioni ed è stato in grado di produrre una situazione OutOfMemory eseguendo uno foreach su Source. Quando l'ho passato al nuovo codice sono riuscito a evitare questo problema.

+2

l'uso di scala.collection.immutable.Stream è piuttosto pericoloso qui - utilizza la memoizzazione (!) (Consultare i documenti http://www.scala-lang.org/api/current/index.html#scala.collection .immutable.Stream), così finirà per avere l'intero file in memoria, invece di farlo scorrere attraverso (!). –

+0

@ Konrad'ktoso'Malawski, punto eccellente. Pubblicherò un aggiornamento con una soluzione alternativa per il problema della memoizzazione. – cmbaxter

+1

buon aggiornamento, esponendo l'iteratore del flusso di input funziona bene. Ricorda di chiudere la risorsa anche quando lo stream termina. –

7

Non utilizzare scala.collection.immutable.Stream per consumare file come questo, il motivo è che esegue la memoizzazione - ovvero, mentre sì è pigro, manterrà l'intero flusso bufferizzato (memoizzato) in memoria!

Questo è sicuramente non quello che vuoi quando pensi a "stream processing a file". Il motivo per cui Scala's Stream funziona in questo modo è perché in un ambiente funzionale ha perfettamente senso: è possibile evitare il calcolo dei numeri di fibbonachi ancora e ancora facilmente grazie a questo, ad esempio, per maggiori dettagli vedere lo ScalaDoc.

Akka Streams fornisce stream reattivi implementazioni e fornisce una classe FileIO che si potrebbe usare qui (sarà adeguatamente back-pressione ed estrarre i dati dal file solo quando è necessario e il resto del flusso è pronto per consumarlo) :

import java.io._ 
import akka.actor.ActorSystem 
import akka.stream.scaladsl.{ Sink, Source } 

object ExampleApp extends App { 


    implicit val sys = ActorSystem() 
    implicit val mat = FlowMaterializer() 

    FileIO.fromPath(Paths.get("/example/file.txt")) 
    .map(c ⇒ { print(c); c }) 
    .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() })) 
} 

Qui ci sono più documenti di lavorare con IO with Akka Streams si noti che questo è per l'attuale serie-come-di scrivere versione di Akka, in modo che il 2.5.x.

Spero che questo aiuti!

+0

Grazie per la fantastica risposta - Ho dovuto trovare nuovamente la mia domanda per sapere cosa stavo cercando: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/java /stream-io.html#Streaming_File_IO - e akka 2.4 è out (presumibilmente significa che è NIO 2)! (Accetterò una volta che tu o io aggiorni/crei una risposta al codice usando l'api) – Stephen

+0

Mantiene sempre l'intero flusso in memoria? O dipende da te che hai un riferimento all'inizio del flusso? La mia impressione (pazzesca?) Era che gli oggetti di "Stream" alla fine sarebbero stati deallocati se avessi elaborato la coda in modo iterativo e dimenticando la testa. – dividebyzero

+0

Si prega di leggere i documenti, li ho collegati di seguito; http://www.scala-lang.org/api/current/scala/collection/immutable/Stream.html –

Problemi correlati