2015-06-10 13 views
7

Partendo dal presupposto che siamo in grado di accedere ai dati molto più rapidamente se eseguiamo direttamente da HDFS anziché utilizzare l'API HBase, stiamo cercando di creare un RDD basato su una tabella Snapshot di HBase.Istantanee Spark e HBase

Quindi, ho uno snapshot chiamato "dm_test_snap". Mi sembra di essere in grado di far funzionare la maggior parte delle cose di configurazione, ma il mio RDD è nullo (nonostante ci siano dati nello stesso Snapshot).

Sto passando un sacco di tempo a trovare un esempio di chiunque faccia analisi offline delle istantanee di HBase con Spark, ma non posso credere di essere solo nel tentativo di farlo funzionare. Qualsiasi aiuto o suggerimento è molto apprezzato.

Ecco un frammento del mio codice:

object TestSnap { 
    def main(args: Array[String]) { 
    val config = ConfigFactory.load() 
    val hbaseRootDir = config.getString("hbase.rootdir") 
    val sparkConf = new SparkConf() 
     .setAppName("testnsnap") 
     .setMaster(config.getString("spark.app.master")) 
     .setJars(SparkContext.jarOfObject(this)) 
     .set("spark.executor.memory", "2g") 
     .set("spark.default.parallelism", "160") 

    val sc = new SparkContext(sparkConf) 

    println("Creating hbase configuration") 
    val conf = HBaseConfiguration.create() 

    conf.set("hbase.rootdir", hbaseRootDir) 
    conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum")) 
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout")) 
    conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap") 

    val scan = new Scan 
    val job = Job.getInstance(conf) 

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
     new Path("hdfs://nameservice1/tmp")) 

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

    hBaseRDD.count() 

    System.exit(0) 
    } 

} 

aggiornamento per includere la soluzione Il trucco è stato, come accennato @Holden di seguito, il conf non era sempre passato attraverso. Per rimediare a questo, sono stato in grado di farlo funzionare modificando questa la chiamata a newAPIHadoopRDD a questo:

val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

c'era un secondo problema che è stato evidenziato anche dalla risposta @ di Victor, che non passavo in una scansione . Per rimediare, ho aggiunto questa linea e il metodo:

conf.set(TableInputFormat.SCAN, convertScanToString(scan)) 

def convertScanToString(scan : Scan) = { 
     val proto = ProtobufUtil.toScan(scan); 
     Base64.encodeBytes(proto.toByteArray()); 
    } 

Questo permetterà anche di me tirare fuori questa linea dai comandi conf.set:

conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap") 

* NOTA: Questo è stato per HBase versione 0.96. 1.1 su CDH5.0

codice finale completo per una facile consultazione:

object TestSnap { 
    def main(args: Array[String]) { 
    val config = ConfigFactory.load() 
    val hbaseRootDir = config.getString("hbase.rootdir") 
    val sparkConf = new SparkConf() 
     .setAppName("testnsnap") 
     .setMaster(config.getString("spark.app.master")) 
     .setJars(SparkContext.jarOfObject(this)) 
     .set("spark.executor.memory", "2g") 
     .set("spark.default.parallelism", "160") 

    val sc = new SparkContext(sparkConf) 

    println("Creating hbase configuration") 
    val conf = HBaseConfiguration.create() 

    conf.set("hbase.rootdir", hbaseRootDir) 
    conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum")) 
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout")) 
    val scan = new Scan 
    conf.set(TableInputFormat.SCAN, convertScanToString(scan)) 

    val job = Job.getInstance(conf) 

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
     new Path("hdfs://nameservice1/tmp")) 

    val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

    hBaseRDD.count() 

    System.exit(0) 
    } 

    def convertScanToString(scan : Scan) = { 
     val proto = ProtobufUtil.toScan(scan); 
     Base64.encodeBytes(proto.toByteArray()); 
    } 

} 
+0

Capisco che, l'unico motivo per utilizzare snapshot anziché tavolo HBase reale, è quello di accelerare il processo. Tuttavia, è necessario considerare, da dove RDD sta leggendo quando si utilizza la tabella Hbase. Come i file HLog o qualsiasi altro. E una volta confermato l'aspetto, l'istantanea e la tabella effettiva sono simili nell'aspetto precedente. Abbiamo riscontrato problemi simili con l'integrazione del framework esterno con hbase.Tutto funziona bene, se si passa per approccio tradizionale. Qualcosa di nuovo per tagliare il tempo, il quadro aveva alcune limitazioni. – Ramzy

+0

Mi aspetterei di accedere direttamente alle HFiles tramite l'istantanea tramite HDFS e il guadagno sarebbe semplicemente nello streaming dei dati in un RDD direttamente dal disco, passando tutte le chiamate a HBase. – dmcnelis

+0

Uno snapshot consiste nel fare riferimento ai file presenti nella tabella nel momento in cui viene eseguita l'istantanea. Nessuna copia dei dati viene eseguita durante l'operazione di istantanea, ma è possibile che vengano eseguite delle copie quando viene attivata una compressione o una cancellazione. Quindi il nuovo metodoAPIHadoopRDD() deve avere una logica extra per recuperare gli HFiles effettivi dallo snapshot, piuttosto che il normale look up al file hadoop/hbase. È necessario confermare questo comportamento a livello di RDD – Ramzy

risposta

3

Guardando le informazioni di lavoro, la sua makin g una copia dell'oggetto conf che stai fornendo ad esso (The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.) quindi molto probabilmente le informazioni che devi impostare sull'oggetto conf non vengono passate a Spark. Potresti invece usare TableSnapshotInputFormatImpl che ha un metodo simile che funziona su oggetti conf. Potrebbero esserci altre cose necessarie, ma a prima vista il problema sembra essere la causa più probabile.

Come indicato nei commenti, un'altra opzione è utilizzare job.getConfiguration per ottenere la configurazione aggiornata dall'oggetto processo.

+0

La soluzione consisteva nel modificare la nuova istanzaAPIHadoopRDD per utilizzare job.getConfiguration invece di passare nell'oggetto conf creato in precedenza .... nella versione 0.96 di HBase TableSnapshotInputFormatImpl non è disponibile, fwiw. – dmcnelis

+1

Ah bello, penso che per le versioni in cui quella classe è disponibile potrebbe funzionare anche, ma ottenere il conf dall'oggetto di lavoro sembra anche un'ottima soluzione. Aggiornerò la risposta per avere entrambi se è bello. – Holden

1

Non è stato configurato il vostro lavoro M/R correttamente: Questo è ad esempio in Java su come configurare M/R sopra istantanee:

Job job = new Job(conf); 
Scan scan = new Scan(); 
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 
     scan, MyTableMapper.class, MyMapKeyOutput.class, 
     MyMapOutputValueWritable.class, job, true); 
} 

Si, sicuramente, saltato Scan. Suggerisco di dare un'occhiata all'applicazione initTableSnapshotMapperJob di TableMapReduceUtil per avere un'idea di come configurare il lavoro in Spark/Scala.

+0

Manca un parametro restorePath. – SUDARSHAN

0

Ecco completa configurazione in MapReduce Java

TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, // Name of the snapshot 
       scan, // Scan instance to control CF and attribute selection 
       DefaultMapper.class, // mapper class 
       NullWritable.class, // mapper output key 
       Text.class, // mapper output value 
       job, 
       true, 
       restoreDir);