In realtà utilizziamo gli stream akka per elaborare file binari. E 'stato un po' difficile da far funzionare le cose in quanto non vi era alcuna documentazione intorno a questo, ma questo è ciò che siamo venuti su con:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte)
val binSource = Source(binStream)
Una volta che avete binSource
, che è un Akka Source[Byte]
si può andare avanti e inizia ad applicare qualsiasi trasformazione del flusso (map
, flatMap
, transform
, ecc.). Questa funzionalità sfrutta lo apply
dell'oggetto companion apply
che accetta uno Iterable
, passando in una scala Stream
che dovrebbe leggere pigramente i dati e renderla disponibile alle proprie trasformazioni.
EDIT
Come Konrad sottolineato nella sezione commenti, un flusso può essere un problema con file di grandi dimensioni a causa del fatto che esso esegue memoization degli elementi che incontra come è pigramente costruendo il flusso. Questo può portare a situazioni di memoria insufficienti se non si presta attenzione. Tuttavia, se si guardano le docs per Stream c'è un suggerimento per evitare Memoizzazione costruire in memoria:
Bisogna essere prudenti di Memoizzazione; se non stai attento, puoi consumare molto rapidamente quantità di memoria grandi . La ragione di ciò è che la memoizzazione del flusso crea una struttura molto simile a scala.collection.immutable.List. Fintanto che qualcosa sta trattenendo su la testa, la testa tiene la coda, e così continua in modo ricorsivo. Se, d'altro canto, non c'è nulla che si aggrappa alla testina (ad esempio, abbiamo usato def per definire il flusso) e una volta che non è più utilizzato direttamente, lo stesso scompare.
Quindi tenendo conto di, si potrebbe modificare il mio esempio originale come segue:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binSource = Source(() => binStream(inputStream).iterator)
def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
Quindi l'idea è quello di costruire il Stream
tramite un def
e non assegnare a un val
e poi subito a ottenere il iterator
da esso e usarlo per inizializzare l'Akka Source
. Impostare le cose in questo modo dovrebbe evitare i problemi con la momoizzazione. Ho eseguito il vecchio codice su un file di grandi dimensioni ed è stato in grado di produrre una situazione OutOfMemory
eseguendo uno foreach
su Source
. Quando l'ho passato al nuovo codice sono riuscito a evitare questo problema.
l'uso di scala.collection.immutable.Stream è piuttosto pericoloso qui - utilizza la memoizzazione (!) (Consultare i documenti http://www.scala-lang.org/api/current/index.html#scala.collection .immutable.Stream), così finirà per avere l'intero file in memoria, invece di farlo scorrere attraverso (!). –
@ Konrad'ktoso'Malawski, punto eccellente. Pubblicherò un aggiornamento con una soluzione alternativa per il problema della memoizzazione. – cmbaxter
buon aggiornamento, esponendo l'iteratore del flusso di input funziona bene. Ricorda di chiudere la risorsa anche quando lo stream termina. –