2015-05-05 17 views
7

Ho un compito che scrive l'output avro in più directory organizzate da pochi campi dei record di input.Uscite multiple Hadoop con esecuzione speculativa

 
For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg: 
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro 
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context); 
.... 
.... 
    multipleOutputs.write("output", avroKey, NullWritable.get(), 
      OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear()); 

Cosa commiter uscita sarebbe l'uso di codice sottostante per scrivere i output.Is non sicuro di essere utilizzato con l'esecuzione speculativa? Con esecuzione speculativo questo provoca (può provocare) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

In questo post Hadoop Reducer: How can I output to multiple directories using speculative execution? Si suggerisce di utilizzare un committer output personalizzato

Il seguente codice da Hadoop AvroMultipleOutputs non indica alcun problema con l'esecuzione speculativa

private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, 
      String baseFileName) throws IOException, InterruptedException { 

    writer = 
       ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), 
        taskContext.getConfiguration())).getRecordWriter(taskContext); 
... 
} 

Nemmeno il documento metodo write eventuali problemi se il percorso baseoutput è al di fuori della directory di lavoro

012.
public void write(String namedOutput, Object key, Object value, String baseOutputPath) 

C'è un problema reale con AvroMultipleOutputs (un altro output) con l'esecuzione speculativa quando si scrive all'esterno della directory del lavoro? Se, allora come faccio a ignorare AvroMultipleOutputs ad avere un proprio uscita committer.I non può vedere alcun OutputFormat all'interno AvroMultipleOutputs cui committer uscita utilizza

+0

hai scritto una propria implementazione? ho la stessa domanda. – tesnik03

+0

W Quando dici "Con l'esecuzione speculativa questo causa (potrebbe causare) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException", hai visto questo documento ovunque, o stai parlando per esperienza. Stiamo assistendo allo stesso comportamento ma non abbiamo trovato riferimenti espliciti per disabilitare l'esecuzione speculativa quando si utilizzano più output. – ioss

+0

Sì, è documentato. C'è un avvertimento a riguardo qui http://archive.cloudera.com/cdh5/cdh/5/hadoop/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html – bl3e

risposta

1

AvroMultipleOutputs useranno il OutputFormat cui si è registrati a configurazioni di lavoro, mentre l'aggiunta di nome di uscita ad esempio utilizzando l'API addNamedOutput da AvroMultipleOutputs (ad esempio AvroKeyValueOutputFormat).

Con AvroMultipleOutputs, potrebbe non essere possibile utilizzare la funzione di esecuzione dell'attività speculativa. Anche ignorarlo non aiuterebbe o non sarebbe semplice.

Invece si dovrebbe scrivere il proprio OutputFormat (molto probabilmente si estende uno dei formati disponibili Avro di output per esempio AvroKeyValueOutputFormat), e di override/attuare la sua getRecordWriter API, dove sarebbe tornare un RecordWriter esempio dire MainRecordWriter (solo per riferimento).

Questo MainRecordWriter manterrebbe una mappa delle istanze (ad esempio AvroKeyValueRecordWriter). Ognuna di queste istanze RecordWriter appartiene a uno dei file di output. Nell'API write di MainRecordWriter, si otterrà l'effettiva istanza RecordWriter dalla mappa (in base al record che si sta per scrivere) e si scriverà il record utilizzando questo registratore di record. Quindi MainRecordWriter funzionerebbe semplicemente come wrapper su più istanze RecordWriter.

Per un'implementazione simile, è possibile studiare il codice della classe MultiStorage dalla libreria piggybank.

0

Quando si aggiunge un output denominato per AvroMultipleOutputs, sarà chiamata sia AvroKeyOutputFormat.getRecordWriter() o AvroKeyValueOutputFormat.getRecordWriter(), che chiamano AvroOutputFormatBase.getAvroFileOutputStream(), il cui contenuto è

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { 
    Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(), 
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT)); 
    return path.getFileSystem(context.getConfiguration()).create(path); 
} 

E AvroOutputFormatBase estende FileOutputFormat (il getOutputCommitter() nel metodo di cui sopra è in realtà una chiamare per FileOutputFormat.getOutputCommitter(). Quindi, AvroMultipleOutputs dovrebbe avere gli stessi vincoli come MultipleOutputs.

Problemi correlati