Sto provando a utilizzare il pacchetto scalaz iteratee per elaborare un file zip di grandi dimensioni in uno spazio costante. Ho un processo di lunga durata che devo eseguire su ogni file nel file zip. Questi processi possono (e dovrebbero) essere eseguiti in parallelo.Scalaz 7 Iteratee per elaborare un file zip di grandi dimensioni (OutOfMemoryError)
Ho creato un EnumeratorT
che gonfia ogni ZipEntry
in un oggetto File
. La firma appare come:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
voglio collegare un IterateeT
che eseguirà il processo di lunga durata su ogni file. Io fondamentalmente finire con qualcosa di simile:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
Quando provo a farlo funzionare:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
ottengo un messaggio java.lang.OutOfMemoryError: Java heap space
. Questo ha senso per me, dal momento che sta cercando di costruire una lista enorme in memoria di tutti questi oggetti IO
e Promise
.
alcune domande:
- Qualcuno ha qualche idea su come evitare questo? Sembra che mi sto avvicinando al problema in modo errato, perché mi interessa solo lo
longRunningProcess
per i suoi effetti collaterali. - L'approccio
Enumerator
è l'approccio sbagliato?
Sono praticamente fuori di idee, quindi tutto può essere d'aiuto.
Grazie!
Update # 1
Ecco la traccia dello stack:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
Attualmente sto prendendo il consiglio di nadavwr per assicurarsi che tutto si comporta come penso che sia. Segnalerò eventuali aggiornamenti.
Aggiornamento # 2
usando le idee da entrambe le risposte di seguito, ho trovato una soluzione decente. Come suggerito da huynhjl (e ho verificato l'utilizzo del suggerimento di nadavwr di analizzare il dump dell'heap), consume
causava la memorizzazione di ogni gonfiato, motivo per cui il processo stava esaurendo la memoria. Ho modificato consume
in foldM
e aggiornato il processo di lunga durata per restituire semplicemente un valore Promise[IOE[Unit]]
anziché un riferimento al file. Alla fine ho una collezione di tutte le IoException. Ecco la soluzione operativa:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
Questa soluzione gonfia ogni voce caricandola in modo asincrono. Alla fine, ho una lista enorme di oggetti Promise
soddisfatti che contengono errori. Non ho ancora pienamente convinto che questo sia l'uso corretto di un Iteratee, ma ora ho diversi pezzi riutilizzabili e componibili che posso usare in altri pezzi del nostro sistema (questo è un modello molto comune per noi).
Grazie per tutto il vostro aiuto!
Cosa fa il processo lungo? Calcola qualcosa dal contenuto zip? – huynhjl
Ogni file nel file zip è un'immagine. Il lungo processo carica quel file su Rackspace CloudFiles. Una volta capito, avrò bisogno di aggiungere processi aggiuntivi che ridimensionino le immagini e poi le carichiamo. –
Iteratees sembra l'astrazione sbagliata per questo lavoro, dal momento che si desidera parallelizzare il carico di lavoro. Gli attori avrebbero funzionato meglio, credo. – huynhjl