Partendo dal presupposto che siamo in grado di accedere ai dati molto più rapidamente se eseguiamo direttamente da HDFS anziché utilizzare l'API HBase, stiamo cercando di creare un RDD basato su una tabella Snapshot di HBase.Istantanee Spark e HBase
Quindi, ho uno snapshot chiamato "dm_test_snap". Mi sembra di essere in grado di far funzionare la maggior parte delle cose di configurazione, ma il mio RDD è nullo (nonostante ci siano dati nello stesso Snapshot).
Sto passando un sacco di tempo a trovare un esempio di chiunque faccia analisi offline delle istantanee di HBase con Spark, ma non posso credere di essere solo nel tentativo di farlo funzionare. Qualsiasi aiuto o suggerimento è molto apprezzato.
Ecco un frammento del mio codice:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
val scan = new Scan
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}
aggiornamento per includere la soluzione Il trucco è stato, come accennato @Holden di seguito, il conf non era sempre passato attraverso. Per rimediare a questo, sono stato in grado di farlo funzionare modificando questa la chiamata a newAPIHadoopRDD a questo:
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
c'era un secondo problema che è stato evidenziato anche dalla risposta @ di Victor, che non passavo in una scansione . Per rimediare, ho aggiunto questa linea e il metodo:
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
Questo permetterà anche di me tirare fuori questa linea dai comandi conf.set:
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
* NOTA: Questo è stato per HBase versione 0.96. 1.1 su CDH5.0
codice finale completo per una facile consultazione:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
val scan = new Scan
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
}
Capisco che, l'unico motivo per utilizzare snapshot anziché tavolo HBase reale, è quello di accelerare il processo. Tuttavia, è necessario considerare, da dove RDD sta leggendo quando si utilizza la tabella Hbase. Come i file HLog o qualsiasi altro. E una volta confermato l'aspetto, l'istantanea e la tabella effettiva sono simili nell'aspetto precedente. Abbiamo riscontrato problemi simili con l'integrazione del framework esterno con hbase.Tutto funziona bene, se si passa per approccio tradizionale. Qualcosa di nuovo per tagliare il tempo, il quadro aveva alcune limitazioni. – Ramzy
Mi aspetterei di accedere direttamente alle HFiles tramite l'istantanea tramite HDFS e il guadagno sarebbe semplicemente nello streaming dei dati in un RDD direttamente dal disco, passando tutte le chiamate a HBase. – dmcnelis
Uno snapshot consiste nel fare riferimento ai file presenti nella tabella nel momento in cui viene eseguita l'istantanea. Nessuna copia dei dati viene eseguita durante l'operazione di istantanea, ma è possibile che vengano eseguite delle copie quando viene attivata una compressione o una cancellazione. Quindi il nuovo metodoAPIHadoopRDD() deve avere una logica extra per recuperare gli HFiles effettivi dallo snapshot, piuttosto che il normale look up al file hadoop/hbase. È necessario confermare questo comportamento a livello di RDD – Ramzy