5

Ho un sacco di log di server compressi snappy in S3 e devo elaborarli utilizzando lo streaming su Elastic MapReduce. Come faccio a comunicare ad Amazon e Hadoop che i log sono già compressi (prima che vengano trascinati in HFS!) In modo che possano essere decompressi prima di essere inviati allo script di Streamer Mapper?Carica file compressi con snappy in Elastic MapReduce

L'unica documentazione che riesco a trovare è qui: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HadoopDataCompression.html#emr-using-snappy e sembra riferirsi alla compressione intermedia, non ai file che vengono compressi quando arrivano all'HFS.

BTW, sto principalmente lavorando in python, quindi punti bonus se hai una soluzione in boto!

risposta

7

La risposta è, "non può essere fatto." Almeno, non per il caso specifico di applicazione di hadoop streaming a file compressi che originano all'esterno di hadoop.

Io (a fondo!) Ho esplorato due opzioni principali per giungere a questa conclusione: (1) tentare di usare la compressione snado incorporata di hadoop come suggerito da highlycaffeated, o (2) scrivere il mio modulo di streaming per consumare e decomprimere snappy File.

Per l'opzione (1), sembra che hadoop aggiunga del markup ai file quando li comprime usando snappy. Poiché i miei file sono compressi usando snado esterno a hadoop, il codec di hadoop incorporato non può decomprimere i file.

Un sintomo di questo problema è stato un errore di spazio di heap:

2013-04-03 20:14:49,739 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.OutOfMemoryError: Java heap space 
    at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:102) 
    at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:82) 
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76) 
    at java.io.InputStream.read(InputStream.java:85) 
    ... 

Quando sono passato a un'istanza di molto più grande e scaldata subito l'impostazione mapred.child.java.opts, ho ottenuto un nuovo errore:

java.io.IOException: IO error in map input file s3n://my-bucket/my-file.snappy 

Il codec di Hadoop non funziona con i file generati esternamente.

Per l'opzione (2), il problema è che lo streaming di hadoop non distingue tra \ n, \ r e \ r \ n interruzioni di riga. Dal momento che una compressione scattante finisce per spargere quei codici byte nei file compressi, questo è fatale. Ecco la mia traccia di errore:

2013-04-03 22:29:50,194 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:372) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:586) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    ... 

Con un po 'di lavoro sulle classi Java di Hadoop (vedi here, per esempio), probabilmente potremmo risolvere il \ r vs problema \ n. Ma come ho detto inizialmente, il mio obiettivo era quello di costruire all'interno del modulo di streaming hadoop, senza toccare Java. Con questo vincolo, non sembra esserci alcun modo per risolvere questo problema.

Alla fine, sono tornato ai ragazzi che generano i file che questo cluster sta consumando e li hanno persuasi a passare a gzip o lzo.

PS - Sull'opzione (2), ho giocato con i record divisi su caratteri diversi (ad esempio textinputformat.record.delimiter = X), ma mi sentivo molto hacky e non funzionava comunque.

PPS - Un'altra soluzione alternativa sarebbe scrivere script per scaricare i file da S3, decomprimerli e quindi eseguire -copyFromLocal per inserirli in HDFS. Computazionalmente, non c'è nulla di sbagliato in questo, ma dal punto di vista del flusso di lavoro introdurrebbe tutti i tipi di problemi.

1

Supponendo che si utilizzi TextInputFormat (o una delle sue sottoclassi), i file di input compressi con estensione .snappy vengono gestiti automaticamente.

Si potrebbe prendere in considerazione l'utilizzo della compressione lzo (estensione .gz) anziché di quella scattante. Si rinuncia alla velocità di compressione per un migliore rapporto di compressione e un file di input che è divisibile. Cloudera cita questo in their blog:

One thing to note is that Snappy is intended to be used with a container format, like Sequence Files or Avro Data Files, rather than being used directly on plain text, for example, since the latter is not splittable and can’t be processed in parallel using MapReduce. This is different to LZO, where is is possible to index LZO compressed files to determine split points so that LZO files can be processed efficiently in subsequent processing.

+0

Ho sentito cosa stai dicendo di LZO contro scattante, e per gli altri che fanno cose simili in futuro, consiglierei anche LZO. Nel mio caso, il team che gestisce lo storage su S3 ha altri motivi per preferire lo snappy, e non danneggerebbe troppo male le nostre prestazioni in questo modo. Quindi stiamo attaccando con una compressione scattante. – Abe

+0

Inoltre, il rilevamento magico basato sull'estensione file che hai menzionato non funziona su molte build di hadoop. Sto usando AWS EMR AMI 2.3.3, versione 1.0.3, e non ha funzionato lì. Ho provato anche diverse altre build di EMR, ma nessuna gioia. – Abe

Problemi correlati