2010-08-16 8 views
16

Sono un principiante in Hadoop. Sto provando il programma Wordcount.MultipleOutputFormat in hadoop

Ora per provare più file di output, io uso MultipleOutputFormat. questo collegamento mi ha aiutato a farlo. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

nella mia classe di driver ho avuto

MultipleOutputs.addNamedOutput(conf, "even", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class); 

    MultipleOutputs.addNamedOutput(conf, "odd", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class);` 

e il mio ridurre classe è diventato questo

public static class Reduce extends MapReduceBase implements 
     Reducer<Text, IntWritable, Text, IntWritable> { 
    MultipleOutputs mos = null; 

    public void configure(JobConf job) { 
     mos = new MultipleOutputs(job); 
    } 

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     if (sum % 2 == 0) { 
      mos.getCollector("even", reporter).collect(key, new IntWritable(sum)); 
     }else { 
      mos.getCollector("odd", reporter).collect(key, new IntWritable(sum)); 
     } 
     //output.collect(key, new IntWritable(sum)); 
    } 
    @Override 
    public void close() throws IOException { 
     // TODO Auto-generated method stub 
    mos.close(); 
    } 
} 

le cose funzionavano, ma ho molti file, (uno dispari e uno anche per ogni mappa -rispetto)

La domanda è: come posso avere solo 2 file di output (pari a & pari) in modo che ogni uscita dispari di ogni mappa-riduzione venga scritta in quello strano file, e lo stesso per pari.

+5

non si utilizza MultipleOutputs MultipleOutputFormat. Entrambe sono librerie diverse. –

risposta

3

Ogni riduttore utilizza un OutputFormat per scrivere record in. Ecco perché ottieni un set di file pari e dispari per riduttore. Questo è di progettazione in modo che ogni riduttore può eseguire scritture in parallelo.

Se si desidera solo un singolo file pari e singolo, sarà necessario impostare mapred.reduce.tasks su 1. Ma le prestazioni ne risentiranno, poiché tutti i mappatori verranno inseriti in un unico riduttore.

Un'altra opzione è modificare il processo in cui legge questi file per accettare più file di input o scrivere un processo separato che unisce questi file.

+3

insttead di cambiare le attività della mappa rossa, ho scavalcato la funzione getFilenameForKeyValue() .. e questo ha funzionato ..... grazie. – raj

1

Più file di output saranno generati in base al numero di riduttori.

È possibile utilizzare DFS Hadoop -getmerge alle uscite unite

+0

grazie :) ma ho bisogno di farlo solo con la mappa, – raj

3

ho scritto una classe per fare questo. basta usare il tuo lavoro:

job.setOutputFormatClass(m_customOutputFormatClass); 

Questa è la mia classe:

import java.io.IOException; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

/** 
* TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br> 
* <p> 
* <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an 
* {@link RecordWriter} instance per folder name. 
* </p> 
* <p> 
* In this class the folder name is defined by the written entry's key.<br> 
* To change this behavior simply extend this class and override the 
* {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own 
* {@link FolderNameExtractor} implementation. 
* </p> 
* 
* 
* @author ykesten 
* 
* @param <K> - Keys type 
* @param <V> - Values type 
*/ 
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> { 

    private String folderName; 

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> { 

     private Map<String, RecordWriter<K, V>> fileNameToWriter; 
     private FolderNameExtractor<K, V> fileNameExtractor; 
     private TaskAttemptContext job; 

     public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) { 
      fileNameToWriter = new HashMap<String, RecordWriter<K, V>>(); 
      this.fileNameExtractor = fileNameExtractor; 
      this.job = job; 
     } 

     @Override 
     public void write(K key, V value) throws IOException, InterruptedException { 
      String fileName = fileNameExtractor.extractFolderName(key, value); 
      RecordWriter<K, V> writer = fileNameToWriter.get(fileName); 
      if (writer == null) { 
       writer = createNewWriter(fileName, fileNameToWriter, job); 
       if (writer == null) { 
        throw new IOException("Unable to create writer for path: " + fileName); 
       } 
      } 
      writer.write(key, value); 
     } 

     @Override 
     public void close(TaskAttemptContext context) throws IOException, InterruptedException { 
      for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) { 
       entry.getValue().close(context); 
      } 
     } 

    } 

    private synchronized RecordWriter<K, V> createNewWriter(String folderName, 
      Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) { 
     try { 
      this.folderName = folderName; 
      RecordWriter<K, V> writer = super.getRecordWriter(job); 
      this.folderName = null; 
      fileNameToWriter.put(folderName, writer); 
      return writer; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 

    @Override 
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { 
     Path path = super.getDefaultWorkFile(context, extension); 
     if (folderName != null) { 
      String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); 
      path = new Path(newPath); 
     } 
     return path; 
    } 

    @Override 
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { 
     return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); 
    } 

    public FolderNameExtractor<K, V> getFolderNameExtractor() { 
     return new KeyFolderNameExtractor<K, V>(); 
    } 

    public interface FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value); 
    } 

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value) { 
      return key.toString(); 
     } 
    } 

}