2015-04-09 7 views
8

Attualmente ho due tabelle Hbase (le chiamiamo tableA e tableB). Utilizzando un lavoro MapReduce a una sola fase, i dati in tableA vengono letti e elaborati e salvati in tableB. Attualmente entrambe le tabelle risiedono nello stesso cluster HBase. Tuttavia, ho bisogno di spostare tableB al suo sul cluster.Come posso leggere da un'istanza di HBase ma scrivere ad un altro?

E 'possibile configurare una mappa a una sola fase ridurre il lavoro in Hadoop per leggere e scrivere da istanze separate di HBase?

+0

È possibile utilizzare la scintilla per questo tipo di lavoro. – Tinku

risposta

3

E 'possibile, HBase di CopyTable MapReduce job lo fa utilizzando TableMapReduceUtil.initTableReducerJob() che permette di impostare un quorumAddress alternativa nel caso in cui è necessario scrivere a cluster remoti:

public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, org.apache.hadoop.mapreduce.Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl) 

quorumAddress - grappolo Distante da scrivere a; l'impostazione predefinita è nulla per l'output nel cluster designato in hbase-site.xml. Impostare questa stringa sull'insieme di zookeeper di un cluster remoto alternativo quando si avrà la scrittura in scrittura di un cluster diverso dal valore predefinito ; per esempio. copiando tabelle tra cluster, la fonte sarebbe designata da hbase-site.xml e questo parametro avrebbe l'indirizzo completo del cluster remoto. Il formato da passare è particolare. Passa al numero :: come server, server2, server3: 2181:/hbase.


Un'altra opzione è quella di implementare il proprio riduttore personalizzato per scrivere nella tabella remota invece di scrivere al contesto. Qualcosa di simile a questo:

public static class MyReducer extends Reducer<Text, Result, Text, Text> { 

    protected Table remoteTable; 
    protected Connection connection; 

    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
     super.setup(context); 
     // Clone configuration and provide a new quorum address for the remote cluster 
     Configuration config = HBaseConfiguration.create(context.getConfiguration()); 
     config.set("hbase.zookeeper.quorum","quorum1,quorum2,quorum3"); 
     connection = ConnectionFactory.createConnection(config); // HBase 0.99+ 
     //connection = HConnectionManager.createConnection(config); // HBase <0.99 
     remoteTable = connection.getTable("myTable".getBytes()); 
     remoteTable.setAutoFlush(false); 
     remoteTable.setWriteBufferSize(1024L*1024L*10L); // 10MB buffer 
    } 

    public void reduce(Text boardKey, Iterable<Result> results, Context context) throws IOException, InterruptedException { 
     /* Write puts to remoteTable */ 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, InterruptedException { 
     super.cleanup(context); 
     if (remoteTable!=null) { 
      remoteTable.flushCommits(); 
      remoteTable.close(); 
     } 
     if(connection!=null) { 
      connection.close(); 
     } 
    } 
} 
+0

Devi amare le persone che downvotes risposte perfettamente valide solo per divertimento. –

+0

Grazie per aver fornito un esempio di riduzione personalizzata – slayton

Problemi correlati