Sto cercando di utilizzare Spark per elaborare i dati provenienti dalle tabelle HBase. This blog post fornisce un esempio di come utilizzare NewHadoopAPI
per leggere i dati da qualsiasi Hadoop InputFormat
.Arricchimento di SparkContext senza incorrere in problemi di serializzazione
Quello che ho fatto
Da quando ho bisogno di fare questo molte volte, stavo cercando di usare impliciti per arricchire SparkContext
, in modo che possa ottenere un RDD da un dato insieme di colonne in HBase. Ho scritto il seguente aiuto:
trait HBaseReadSupport {
implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)
implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}
final class HBaseSC(sc: SparkContext) extends Serializable {
def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
data map { case (cf, columns) =>
val content = columns map { column =>
val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)
column -> interpret(CellUtil.cloneValue(cell))
} toMap
cf -> content
}
def makeConf(table: String) = {
val conf = HBaseConfiguration.create()
conf.setBoolean("hbase.cluster.distributed", true)
conf.setInt("hbase.client.scanner.caching", 10000)
conf.set(TableInputFormat.INPUT_TABLE, table)
conf
}
def hbase[A](table: String, data: Map[String, List[String]])
(interpret: Array[Byte] => A) =
sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
Bytes.toString(key.get) -> extract(data, row, interpret)
}
}
Esso può essere utilizzato come
val rdd = sc.hbase[String](table, Map(
"cf" -> List("col1", "col2")
))
In questo caso si ottiene un RDD di (String, Map[String, Map[String, String]])
, dove il primo componente è il rowkey e la seconda è una mappa le cui le chiavi sono famiglie di colonne ei valori sono mappe le cui chiavi sono colonne e il cui contenuto sono i valori della cella.
Dove fallisce
Purtroppo, sembra che il mio lavoro ottiene un riferimento a sc
, che non è di per sé serializzabile di progettazione. Cosa ottengo quando corro il lavoro è
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
posso rimuovere le classi di supporto e utilizzare la stessa linea logica nel mio lavoro e tutto funziona bene. Ma voglio ottenere qualcosa che posso riutilizzare, invece di scrivere sempre la stessa caldaia.
A proposito, il problema non è specifico per implicito, anche utilizzando una funzione di sc
presenta lo stesso problema.
Per confronto, le seguenti helper per leggere i file TSV (lo so che è rotto in quanto non supporta citando e così via, non importa) sembra funzionare bene:
trait TsvReadSupport {
implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}
final class TsvRDD(val sc: SparkContext) extends Serializable {
def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
val contents = line.split(separator).toList
(fields, contents).zipped.toMap
}
}
Come posso incapsulare la logica per leggere le righe da HBase senza catturare involontariamente SparkContext?
Ho una risposta qui: http://stackoverflow.com/questions/23619775/spark-serialization- error/23628930 # 23628930 – samthebest