2013-01-13 16 views
5

Mi sono guardato intorno per giorni cercando di trovare un modo utilizzando dati ridotti per un'ulteriore mappatura in hadoop. Ho oggetti della classe A come dati di input e oggetti della classe B come dati di output. Il problema è che durante la mappatura non vengono generati solo i numeri B ma anche i nuovi A s.Dividi dati ridotti in output e nuovo input in Hadoop

Ecco cosa mi piacerebbe realizzare:

1.1 input: a list of As 
1.2 map result: for each A a list of new As and a list of Bs is generated 
1.3 reduce: filtered Bs are saved as output, filtered As are added to the map jobs 

2.1 input: a list of As produced by the first map/reduce 
2.2 map result: for each A a list of new As and a list of Bs is generated 
2.3 ... 

3.1 ... 

si dovrebbe ottenere l'idea di base.

Ho letto molto sul concatenamento, ma non sono sicuro di come combinare ChainReducer e ChainMapper o anche se questo sarebbe l'approccio giusto.

Quindi, ecco la mia domanda: Come posso dividere i dati mappati mentre si riduce a salvare una parte come output e l'altra parte come nuovi dati di input.

risposta

2

Provare a utilizzare MultipleOutputs. Come è Javadoc suggerisce:

I MultipleOutputs classe semplifica la scrittura dei dati di uscita a più uscite

caso uno: la scrittura di uscite supplementari diverse da quella predefinita lavoro uscita. Ogni output aggiuntivo o output denominato può essere configurato con con il proprio OutputFormat, con la propria classe di chiavi e con la propria classe di valore .

Caso due: per scrivere i dati in file diversi forniti dall'utente

modello di utilizzo per l'invio dei lavori:

Job job = new Job(); 

FileInputFormat.setInputPath(job, inDir); 
FileOutputFormat.setOutputPath(job, outDir); 

job.setMapperClass(MOMap.class); 
job.setReducerClass(MOReduce.class); 
... 

// Defines additional single text based output 'text' for the job 
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, 
LongWritable.class, Text.class); 

// Defines additional sequence-file based output 'sequence' for the job 
MultipleOutputs.addNamedOutput(job, "seq", 
    SequenceFileOutputFormat.class, 
    LongWritable.class, Text.class); 
... 

job.waitForCompletion(true); 
... 

Uso in riduttore:

String generateFileName(K k, V v) { 
    return k.toString() + "_" + v.toString(); 
} 

public class MOReduce extends 
    Reducer<WritableComparable, Writable,WritableComparable, Writable> { 
private MultipleOutputs mos; 
public void setup(Context context) { 
... 
mos = new MultipleOutputs(context); 
} 

public void reduce(WritableComparable key, Iterator<Writable> values, 
Context context) 
throws IOException { 
... 
mos.write("text", , key, new Text("Hello")); 
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); 
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); 
mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); 
... 
} 

public void cleanup(Context) throws IOException { 
mos.close(); 
... 
} 

} 
+0

prega nota che questi esempi di codice sono per Hadoop 0. * ma non 1.0 .4. Poiché sto lavorando con 1.0.4, le interfacce sono leggermente cambiate. Ma l'idea di base era ciò che stavo cercando. Grazie! – Mennny

+0

sì è vero. questo era per 0.20 – Amar

Problemi correlati