2013-04-26 14 views
5

Sto provando a utilizzare il pacchetto scalaz iteratee per elaborare un file zip di grandi dimensioni in uno spazio costante. Ho un processo di lunga durata che devo eseguire su ogni file nel file zip. Questi processi possono (e dovrebbero) essere eseguiti in parallelo.Scalaz 7 Iteratee per elaborare un file zip di grandi dimensioni (OutOfMemoryError)

Ho creato un EnumeratorT che gonfia ogni ZipEntry in un oggetto File. La firma appare come:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO] 

voglio collegare un IterateeT che eseguirà il processo di lunga durata su ogni file. Io fondamentalmente finire con qualcosa di simile:

type IOE[A] = IoExceptionOr[A] 

def action(f:File):IO[List[Promise[IOE[File]]]] = (
    consume[Promise[IOE[File]], IO, List] %= 
    map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] = 
    Promise { Thread.sleep(5000); iof } 

Quando provo a farlo funzionare:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get 

ottengo un messaggio java.lang.OutOfMemoryError: Java heap space. Questo ha senso per me, dal momento che sta cercando di costruire una lista enorme in memoria di tutti questi oggetti IO e Promise.

alcune domande:

  • Qualcuno ha qualche idea su come evitare questo? Sembra che mi sto avvicinando al problema in modo errato, perché mi interessa solo lo longRunningProcess per i suoi effetti collaterali.
  • L'approccio Enumerator è l'approccio sbagliato?

Sono praticamente fuori di idee, quindi tutto può essere d'aiuto.

Grazie!

Update # 1

Ecco la traccia dello stack:

[error] java.lang.OutOfMemoryError: Java heap space 
[error]   at scalaz.Free.flatMap(Free.scala:46) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 

Attualmente sto prendendo il consiglio di nadavwr per assicurarsi che tutto si comporta come penso che sia. Segnalerò eventuali aggiornamenti.

Aggiornamento # 2

usando le idee da entrambe le risposte di seguito, ho trovato una soluzione decente. Come suggerito da huynhjl (e ho verificato l'utilizzo del suggerimento di nadavwr di analizzare il dump dell'heap), consume causava la memorizzazione di ogni gonfiato, motivo per cui il processo stava esaurendo la memoria. Ho modificato consume in foldM e aggiornato il processo di lunga durata per restituire semplicemente un valore Promise[IOE[Unit]] anziché un riferimento al file. Alla fine ho una collezione di tutte le IoException. Ecco la soluzione operativa:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
    foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %= 
    map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] = 
    Promise { Thread.sleep(5000); iof.map(println) } 

Questa soluzione gonfia ogni voce caricandola in modo asincrono. Alla fine, ho una lista enorme di oggetti Promise soddisfatti che contengono errori. Non ho ancora pienamente convinto che questo sia l'uso corretto di un Iteratee, ma ora ho diversi pezzi riutilizzabili e componibili che posso usare in altri pezzi del nostro sistema (questo è un modello molto comune per noi).

Grazie per tutto il vostro aiuto!

+0

Cosa fa il processo lungo? Calcola qualcosa dal contenuto zip? – huynhjl

+0

Ogni file nel file zip è un'immagine. Il lungo processo carica quel file su Rackspace CloudFiles. Una volta capito, avrò bisogno di aggiungere processi aggiuntivi che ridimensionino le immagini e poi le carichiamo. –

+0

Iteratees sembra l'astrazione sbagliata per questo lavoro, dal momento che si desidera parallelizzare il carico di lavoro. Gli attori avrebbero funzionato meglio, credo. – huynhjl

risposta

4

Non utilizzare consume. Vedere la mia altra risposta recente: How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldM potrebbe essere una scelta migliore.

Prova anche a mappare il file a qualcos'altro (come un codice di ritorno riuscito) per vedere se ciò consente alla JVM di raccogliere le voci di zip inflazionate.

+0

Grazie per la risposta. Alla fine, usare 'foldM' sembrava essere la chiave. –

0

ho iniziato la risposta, dopo una rapida lettura attraverso, e in qualche modo aveva 'di stack overflow' bloccato nella mia mente, invece di 'errore di memoria' ... Deve essere l'URL :-)

Eppure, i calcoli funzionali basati sulle ricorsioni sono suscettibili di impilare gli overflow, quindi ho lasciato in sospeso la risposta per qualsiasi corpo che inciampa e prometto di cercare di trovare una risposta più pertinente.

Se quello che hai ottenuto è stato un overflow di stack, avresti bisogno di un 'trampolino', un costrutto che aumenta il tuo calcolo fuori dallo stack tra le ricorsioni.

Vedere la sezione "Scala senza stack con le monade libere" in Learning Scalaz Day 18, parte dell'eccellente serie di post di @ eed3si9n.

Vedere anche this gist di @mpilquist, a dimostrazione di un iteratoe trampolino.

+1

Haha, stackoverflow.com è un nome sfortunato quando si parla di processi funzionali di lunga durata. –

1

Quanto costa (in termini di memoria è il vostro longRunningProcess? Che ne dite di file di deflazione? Sono in corso di esecuzione il numero di volte che si aspetta che siano? (Un semplice contatore sarebbe utile)

Una traccia dello stack sarà

Se si vuole essere certi di cosa sta assorbendo così tanta memoria, è possibile utilizzare l'argomento JVM -XX:+HeapDumpOnOutOfMemoryError e quindi analizzarlo con VisualVM, Eclipse, in modo da determinare la goccia che ha fatto traboccare il vaso. MAT o altri analizzatori di heap

Follo wup

Mi sembra strano che stiate enumerando le promesse. È controintuitivo dare il via a un calcolo indipendente sia dall'enumeratore che dall'iterazione. Una soluzione basata su iteratee potrebbe essere meglio servita da un enumeratore che restituisce elementi "inerti" invece di promesse. Sfortunatamente, ciò renderebbe la gestione dei singoli file seriali, ma questo è iterato per ya: l'elaborazione del flusso non bloccante.

Una soluzione basata su attori si adatterebbe meglio a IMHO, ma sia gli attori che gli iterate (in particolare il secondo) sembrano eccessivi per quello che stai cercando di realizzare (almeno le parti che stai condividendo).

Si prega di considerare semplici futuri/promesse dal pacchetto scala.concurrent di Scala 2.10, e sii sicuro di dare un'occhiata anche alle collezioni parallele di Scala. Non introdurrei ulteriori concetti nel codice prima che questi si dimostrino insufficienti. Prova a definire un ExecutionContext a dimensione fissa per limitare il parallelismo.

+0

Ottimo consiglio. Seguirò passo passo la procedura per garantire che tutto venga eseguito come suppongo sia. Ho aggiornato la mia domanda sopra con la traccia dello stack. Proverò la discarica dell'heap successiva. Grazie! –

+0

Per quanto riguarda il follow-up: concordo con le vostre preoccupazioni sull'utilizzo di Iteratee per questo processo. Da quello che ho postato, sembra decisamente eccessivo. Tuttavia, lo schema di download di un file (o file), lo streaming dei contenuti, l'elaborazione di ogni voce, quindi l'esecuzione di qualcosa con il risultato viene utilizzato in tutta la posizione nella nostra app. Mi sento come se Iteratee mi abbia dato dei bei pezzi riutilizzabili di codice che posso usare per costruire questi processi più grandi. Grazie mille per il tuo tempo e il tuo aiuto! –

Problemi correlati