2012-06-16 12 views
8

Ho un processo haddop che il suo output dovrebbe essere scritto in HBase. Non ho davvero bisogno di riduttore, il tipo di riga che vorrei inserire è determinato nel Mapper.Hadoop - Scrittura in HBase direttamente dal Mapper

Come posso utilizzare TableOutputFormat per ottenere questo risultato? Da tutti gli esempi che ho visto, il presupposto è che il riduttore è quello che crea il Put, e che TableMapper è solo per la lettura dalla tabella HBase.

Nel mio caso l'input è HDFS l'output è Put to tabella specifica, non riesco a trovare nulla in TableMapReduceUtil che possa aiutarmi con quello.

C'è qualche esempio là fuori che può aiutarmi con quello?

BTW, sto usando la nuova API Hadoop

+0

quanti dischi stai cercando di inserire? – Gevorg

risposta

1

Hai solo bisogno di rendere l'output mapper la coppia. OutputFormat specifica solo come mantenere i valori-chiave di output. Non significa necessariamente che i valori chiave derivino dal riduttore. Si avrebbe bisogno di fare qualcosa di simile nel mapper:

... extends TableMapper<ImmutableBytesWritable, Put>() { 
    ... 
    ... 
    context.write(<some key>, <some Put or Delete object>); 
} 
7

Questo è l'esempio di lettura dal file e mettere tutte le linee in HBase. Questo esempio è tratto da "Hbase: la guida definitiva" e puoi trovarlo sul repository. Per farlo basta clonare pronti contro termine sul computer:

git clone git://github.com/larsgeorge/hbase-book.git 

In questo libro si possono anche trovare tutte le spiegazioni sul codice. Ma se qualcosa è incomprensibile per te, sentiti libero di chiedere.

` public class ImportFromFile { 
    public static final String NAME = "ImportFromFile"; 
    public enum Counters { LINES } 

    static class ImportMapper 
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
     private byte[] family = null; 
     private byte[] qualifier = null; 

     @Override 
     protected void setup(Context context) 
     throws IOException, InterruptedException { 
     String column = context.getConfiguration().get("conf.column"); 
     byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); 
     family = colkey[0]; 
     if (colkey.length > 1) { 
      qualifier = colkey[1]; 
     } 
     } 

     @Override 
     public void map(LongWritable offset, Text line, Context context) 
     throws IOException { 
      try { 
      String lineString = line.toString(); 
      byte[] rowkey = DigestUtils.md5(lineString); 
      Put put = new Put(rowkey); 
      put.add(family, qualifier, Bytes.toBytes(lineString)); 
      context.write(new ImmutableBytesWritable(rowkey), put); 
      context.getCounter(Counters.LINES).increment(1); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     } 
    } 

    private static CommandLine parseArgs(String[] args) throws ParseException { 
     Options options = new Options(); 
     Option o = new Option("t", "table", true, 
     "table to import into (must exist)"); 
     o.setArgName("table-name"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("c", "column", true, 
     "column to store row data into (must exist)"); 
     o.setArgName("family:qualifier"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("i", "input", true, 
     "the directory or file to read from"); 
     o.setArgName("path-in-HDFS"); 
     o.setRequired(true); 
     options.addOption(o); 
     options.addOption("d", "debug", false, "switch on DEBUG log level"); 
     CommandLineParser parser = new PosixParser(); 
     CommandLine cmd = null; 
     try { 
     cmd = parser.parse(options, args); 
     } catch (Exception e) { 
     System.err.println("ERROR: " + e.getMessage() + "\n"); 
     HelpFormatter formatter = new HelpFormatter(); 
     formatter.printHelp(NAME + " ", options, true); 
     System.exit(-1); 
     } 
     return cmd; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = HBaseConfiguration.create(); 
     String[] otherArgs = 
     new GenericOptionsParser(conf, args).getRemainingArgs(); 
     CommandLine cmd = parseArgs(otherArgs); 
     String table = cmd.getOptionValue("t"); 
     String input = cmd.getOptionValue("i"); 
     String column = cmd.getOptionValue("c"); 
     conf.set("conf.column", column); 
     Job job = new Job(conf, "Import from file " + input + " into table " + table); 

      job.setJarByClass(ImportFromFile.class); 
     job.setMapperClass(ImportMapper.class); 
     job.setOutputFormatClass(TableOutputFormat.class); 
     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Writable.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.addInputPath(job, new Path(input)); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    }` 
+1

Sto ottenendo il seguente: 'Eccezione dal lancio del contenitore: org.apache.hadoop.util.Shell $ ExitCodeException' Hai avuto questo problema anche con il codice sopra? Sto usando Hadoop2.4 e Hbase0.94.18 – Gevorg

Problemi correlati