2014-12-08 18 views
5

Sto usando hadoop map reduce e voglio calcolare due file. Il mio primo Map/Reduce iterazione sono io un dando un file con un numero di coppie ID come questo:Hadoop input multipli

A 30 
D 20 

Il mio obiettivo è quello di utilizzare tale ID dal file da associare a un altro file e hanno un'altra uscita con un trio: ID, numero, nome, in questo modo:

A ABC 30 
D EFGH 20 

Ma io non sono sicuro se utilizzando Map Reduce è il modo migliore per farlo. Sarebbe meglio ad esempio utilizzare un lettore di file per leggere il secondo file di input e ottenere il nome per ID? O posso farlo con Map Reduce?

Se è così, sto cercando di scoprire come. Ho cercato una soluzione MultipleInput:

MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"), 
    TextInputFormat.class, FlightsByCarrierMapper2.class); 
MultipleInputs.addInputPath(job2, new Path("inputplanes"), 
    TextInputFormat.class, FlightsModeMapper.class); 

Ma non riesco a pensare a qualsiasi soluzione per combinare le due cose e ottenere il risultato che voglio. Il modo in cui ho in questo momento è solo mi dà la lista come in questo esempio:

A ABC 
A 30 
B ABCD 
C ABCDEF 
D EFGH 
D 20 

dopo il mio ultimo ridurre sto ottenendo questo:

N125DL 767-332 
N125DL 7 , 
N126AT 737-76N 
N126AT 19 , 
N126DL 767-332 
N126DL 1 , 
N127DL 767-332 
N127DL 7 , 
N128DL 767-332 
N128DL 3 

voglio che questo: N127DL 7 767-332. E inoltre, non voglio quelli che non si combinano.

E questa è la mia classe di ridurre:

public class FlightsByCarrierReducer2 estende Reducer {

String merge = ""; 
protected void reduce(Text token, Iterable<Text> values, Context context) 
          throws IOException, InterruptedException { 

    int i = 0; 
    for(Text value:values) 
    { 
     if(i == 0){ 
      merge = value.toString()+","; 
     } 
     else{ 
      merge += value.toString(); 
     } 
     i++; 
    } 

     context.write(token, new Text(merge)); 

} 

}

Aggiornamento:

http://stat-computing.org/dataexpo/2009/the-data.html questo è l'esempio che sto utilizzando .

Sto provando con: TailNum e Annullato che è (1 o 0) ottenere il nome del modello che corrisponde al TailNum. Il mio file con modello ha un TailNumb, Model e altre cose. La mia uscita in corrente è:

N193JB ERJ 190-100 IGW

N194DN 767-332

N19503 EMB-135ER

N19554 EMB-145LR

N195DN 767-332

N195DN 2

Prima viene la chiave, seconda il modello, le chiavi che ha voli cancellati, apperas sotto il modello

E vorrei una chiave trio, Modello Numero di Annullato, perché voglio numero di cancellazioni per modello

+0

Quali sono le dimensioni previste di entrambi i file di input? – blackSmith

+0

il primo circa 600k voci, il secondo intorno a 2k – dex90

+1

dice che il secondo file ha una lunghezza della linea di 100 byte in media, quindi la dimensione totale sarà di circa 200k. Immagino che tu possa metterlo in "DistributedCache" per eseguire un join sul lato mappa e risparmiare un po 'di carburante ;-) – blackSmith

risposta

1

È possibile unirsi a loro con ID come chiave per entrambi i mapper. scrivi tu compito mappa come qualcosa di simile

public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException 
{ 
    //Get the line 
    //split the line to get ID seperate 
    //word1 = A 
    //word2 = 30 
       //Likewise for A ABC 
        //word1 = A 
        //word2 = ABC 
    context.write(word1, word2); 
} 

Penso che si può resuse lo stesso compito Map. E quindi scrivere un commando di ridondanza dove Hadoop Framework raggruppa i dati in base alle chiavi. Quindi sarai in grado di ottenere l'ID come chiave. E È possibile memorizzare nella cache uno dei valori e quindi concatenare.

String merge = ""; 
public void reduce(Text key, Iterable<Text> values, Context context) 
{ 
    int i =0; 
    for(Text value:values) 
    { 
     if(i == 0){ 
      merge = value.toString()+","; 
     } 
     else{ 
      merge += value.toString(); 
     } 
     i++; 
    } 
    valEmit.set(merge); 
    context.write(key, valEmit); 
} 

Infine è possibile scrivere la classe del driver

public int run(String[] args) throws Exception { 
Configuration c=new Configuration(); 
String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); 
Path p1=new Path(files[0]); 
Path p2=new Path(files[1]); 
Path p3=new Path(files[2]); 
FileSystem fs = FileSystem.get(c); 
if(fs.exists(p3)){ 
    fs.delete(p3, true); 
    } 
Job job = new Job(c,"Multiple Job"); 
job.setJarByClass(MultipleFiles.class); 
MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class); 
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class); 
job.setReducerClass(MultipleReducer.class); 
. 
. 
} 

È possibile trovare l'esempio HERE

Spero che questo aiuti.


UPDATE

Input1

A 30 
D 20 

Input2

A ABC 
D EFGH 

Out mettere

A ABC 30 
D EFGH 20 

Mapper.java

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

/** 
* @author sreeveni 
* 
*/ 
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { 
    Text keyEmit = new Text(); 
    Text valEmit = new Text(); 

    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     String line = value.toString(); 
     String parts[] = line.split(" "); 
     keyEmit.set(parts[0]); 
     valEmit.set(parts[1]); 
     context.write(keyEmit, valEmit); 
    } 
} 

Reducer.java

import java.io.IOException; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

/** 
* @author sreeveni 
* 
*/ 
public class ReducerJoin extends Reducer<Text, Text, Text, Text> { 

    Text valEmit = new Text(); 
    String merge = ""; 

    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     String character = ""; 
     String number = ""; 
     for (Text value : values) { 
      // ordering output 
      String val = value.toString(); 
      char myChar = val.charAt(0); 

      if (Character.isDigit(myChar)) { 
       number = val; 
      } else { 
       character = val; 
      } 
     } 
     merge = character + " " + number; 
     valEmit.set(merge); 
     context.write(key, valEmit); 
    } 

} 

classe del driver

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

/** 
* @author sreeveni 
* 
*/ 
public class Driver extends Configured implements Tool { 
    public static void main(String[] args) throws Exception { 
     // TODO Auto-generated method stub 
     // checking the arguments count 

     if (args.length != 3) { 
      System.err 
        .println("Usage : <inputlocation> <inputlocation> <outputlocation> "); 
      System.exit(0); 
     } 
     int res = ToolRunner.run(new Configuration(), new Driver(), args); 
     System.exit(res); 

    } 

    @Override 
    public int run(String[] args) throws Exception { 
     // TODO Auto-generated method stub 
     String source1 = args[0]; 
     String source2 = args[1]; 
     String dest = args[2]; 
     Configuration conf = new Configuration(); 
     conf.set("mapred.textoutputformat.separator", " "); // changing default 
                  // delimiter to user 
                  // input delimiter 
     FileSystem fs = FileSystem.get(conf); 
     Job job = new Job(conf, "Multiple Jobs"); 

     job.setJarByClass(Driver.class); 
     Path p1 = new Path(source1); 
     Path p2 = new Path(source2); 
     Path out = new Path(dest); 
     MultipleInputs.addInputPath(job, p1, TextInputFormat.class, 
       Mapper1.class); 
     MultipleInputs.addInputPath(job, p2, TextInputFormat.class, 
       Mapper1.class); 
     job.setReducerClass(ReducerJoin.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     job.setOutputFormatClass(TextOutputFormat.class); 

     /* 
     * delete if exist 
     */ 
     if (fs.exists(out)) 
      fs.delete(out, true); 

     TextOutputFormat.setOutputPath(job, out); 
     boolean success = job.waitForCompletion(true); 

     return success ? 0 : 1; 
    } 

} 
+0

Penso di essere più vicino, ma non sto ottenendo l'output corretto, non so perché, appena aggiornato la domanda – dex90

0

Il riduttore ha un metodo mappa, ma dovrebbe avere un metodo di riduzione che accetta una raccolta di valori Iterable che quindi si uniscono. Poiché non si dispone di un metodo reduce(), si ottiene il comportamento predefinito che consiste nel passare tutte le coppie chiave/valore.

+0

il nome del metodo è sbagliato ... Avevo già notato quell'errore. Ma non fa la differenza. Ad ogni modo avevo già provato con Iterable Collection, ma non funzionava. Pubblicherò comunque il mio attuale Reducer. – dex90

+0

Aggiungi il flag @ Override al metodo per forzare il compilatore ad assicurarsi di averlo sovrascritto correttamente. –