2012-06-14 4 views
8

Sto pensando di utilizzare HBase come sorgente per uno dei miei lavori MapReduce. So che TableInputFormat specifica una divisione di input (e quindi un mappatore) per Regione. Tuttavia, questo sembra inefficiente. Mi piacerebbe molto avere più mappatori che lavorano su una data regione contemporaneamente. Posso ottenere questo risultato estendendo TableInputFormatBase? Puoi per favore indicarmi un esempio? Inoltre, questa è anche una buona idea?Quando si utilizza HBase come sorgente per MapReduce, è possibile estendere TableInputFormatBase per creare più suddivisioni e più mapper per ciascuna regione?

Grazie per l'aiuto.

risposta

1

Non sono sicuro se è possibile specificare più mapper per una data regione, ma si consideri il seguente:

Se si pensa che un mapper è inefficiente per regione (forse i nodi di dati non hanno abbastanza risorse come #cpus) , puoi forse specificare le dimensioni delle regioni più piccole nel file hbase-site.xml.

ecco un sito per le opzioni configs di default se si vuole prendere in considerazione la modifica che: http://hbase.apache.org/configuration.html#hbase_default_configurations

si ricorda che, rendendo la dimensione della regione piccola, vi sarà l'aumento del numero di file nella DFS, e questo può limitare la capacità del tuo DFS hadoop a seconda della memoria del tuo namenode. Ricorda, l'utilizzo della memoria del namenode è direttamente correlato al numero di file nel tuo DFS. Questo può o meno essere rilavante per la tua situazione in quanto non so come viene utilizzato il tuo cluster. Non c'è mai una risposta pallottola d'argento a queste domande!

-1

Questo sarebbe utile se si desidera eseguire la scansione di aree di grandi dimensioni (centinaia di milioni di righe) con scansione condizionale che trova solo alcuni record. Ciò impedirà ScannerTimeoutException

package org.apache.hadoop.hbase.mapreduce; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 

public class RegionSplitTableInputFormat extends TableInputFormat { 

    public static final String REGION_SPLIT = "region.split"; 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 

     Configuration conf = context.getConfiguration(); 
     int regionSplitCount = conf.getInt(REGION_SPLIT, 0); 
     List<InputSplit> superSplits = super.getSplits(context); 
     if (regionSplitCount <= 0) { 
      return superSplits; 
     } 

     List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount); 

     for (InputSplit inputSplit : superSplits) { 
      TableSplit tableSplit = (TableSplit) inputSplit; 
      System.out.println("splitting by " + regionSplitCount + " " + tableSplit); 
      byte[] startRow0 = tableSplit.getStartRow(); 
      byte[] endRow0 = tableSplit.getEndRow(); 
      boolean discardLastSplit = false; 
      if (endRow0.length == 0) { 
       endRow0 = new byte[startRow0.length]; 
       Arrays.fill(endRow0, (byte) 255); 
       discardLastSplit = true; 
      } 
      byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount); 
      if (discardLastSplit) { 
       split[split.length - 1] = new byte[0]; 
      } 
      for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) { 
       byte[] startRow = split[regionSplit]; 
       byte[] endRow = split[regionSplit + 1]; 
       TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow, 
         tableSplit.getLocations()[0]); 
       splits.add(newSplit); 
      } 
     } 

     return splits; 
    } 
} 
0

1. È assolutamente buono, basta assicurarsi che il set di chiavi si escluda a vicenda tra i mappatori.

  1. arent creare troppi clienti come questo può portare a molti gc, come durante HBase leggere HBase cache di blocco zangolatura accade
0

Usando questo MultipleScanTableInputFormat, è possibile utilizzare Configurazione MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER per controllare quanti mappatori devono essere eseguiti contro un singolo server delle regioni. La classe raggrupperà tutte le divisioni di input in base alla loro posizione (regionerver) e RecordReader eseguirà correttamente tutte le divisioni aggregate per il programma di mapping.

Ecco l'esempio

https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

che il lavoro che hai creato le molteplici scissioni aggregati per un singolo mapper

private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException { 
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>(); 

final Scan scan = getScan(); 

for (int i = 0; i < startRows.size(); i++) { 
    scan.setStartRow(startRows.get(i)); 
    scan.setStopRow(stopRows.get(i)); 

    setScan(scan); 

    aggregatedSplits.addAll(super.getSplits(context)); 
} 

// set the state back to where it was.. 
scan.setStopRow(null); 
scan.setStartRow(null); 

setScan(scan); 

return aggregatedSplits; 
} 

Crea partizione server Regione

@Override 
public List<InputSplit> getSplits(JobContext context) throws IOException { 
List<InputSplit> source = getAggregatedSplits(context); 

if (!partitionByRegionServer) { 
    return source; 
} 

// Partition by regionserver 
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create(); 
for (InputSplit split : source) { 
    TableSplit cast = (TableSplit) split; 
    String rs = cast.getRegionLocation(); 

    partitioned.put(rs, cast); 
} 
Problemi correlati