2014-05-19 13 views
9

Sto cercando di utilizzare Spark per elaborare i dati provenienti dalle tabelle HBase. This blog post fornisce un esempio di come utilizzare NewHadoopAPI per leggere i dati da qualsiasi Hadoop InputFormat.Arricchimento di SparkContext senza incorrere in problemi di serializzazione

Quello che ho fatto

Da quando ho bisogno di fare questo molte volte, stavo cercando di usare impliciti per arricchire SparkContext, in modo che possa ottenere un RDD da un dato insieme di colonne in HBase. Ho scritto il seguente aiuto:

trait HBaseReadSupport { 
    implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc) 

    implicit def bytes2string(bytes: Array[Byte]) = new String(bytes) 
} 


final class HBaseSC(sc: SparkContext) extends Serializable { 
    def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) = 
    data map { case (cf, columns) => 
     val content = columns map { column => 
     val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes) 

     column -> interpret(CellUtil.cloneValue(cell)) 
     } toMap 

     cf -> content 
    } 

    def makeConf(table: String) = { 
    val conf = HBaseConfiguration.create() 

    conf.setBoolean("hbase.cluster.distributed", true) 
    conf.setInt("hbase.client.scanner.caching", 10000) 
    conf.set(TableInputFormat.INPUT_TABLE, table) 

    conf 
    } 

    def hbase[A](table: String, data: Map[String, List[String]]) 
    (interpret: Array[Byte] => A) = 

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat], 
     classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) => 
     Bytes.toString(key.get) -> extract(data, row, interpret) 
     } 

} 

Esso può essere utilizzato come

val rdd = sc.hbase[String](table, Map(
    "cf" -> List("col1", "col2") 
)) 

In questo caso si ottiene un RDD di (String, Map[String, Map[String, String]]), dove il primo componente è il rowkey e la seconda è una mappa le cui le chiavi sono famiglie di colonne ei valori sono mappe le cui chiavi sono colonne e il cui contenuto sono i valori della cella.

Dove fallisce

Purtroppo, sembra che il mio lavoro ottiene un riferimento a sc, che non è di per sé serializzabile di progettazione. Cosa ottengo quando corro il lavoro è

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) 

posso rimuovere le classi di supporto e utilizzare la stessa linea logica nel mio lavoro e tutto funziona bene. Ma voglio ottenere qualcosa che posso riutilizzare, invece di scrivere sempre la stessa caldaia.

A proposito, il problema non è specifico per implicito, anche utilizzando una funzione di sc presenta lo stesso problema.

Per confronto, le seguenti helper per leggere i file TSV (lo so che è rotto in quanto non supporta citando e così via, non importa) sembra funzionare bene:

trait TsvReadSupport { 
    implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc) 
} 

final class TsvRDD(val sc: SparkContext) extends Serializable { 
    def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line => 
    val contents = line.split(separator).toList 

    (fields, contents).zipped.toMap 
    } 
} 

Come posso incapsulare la logica per leggere le righe da HBase senza catturare involontariamente SparkContext?

+0

Ho una risposta qui: http://stackoverflow.com/questions/23619775/spark-serialization- error/23628930 # 23628930 – samthebest

risposta

12

Basta aggiungere @transient annotazione sc variabile:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable { 
    ... 
} 

e assicurarsi sc non viene utilizzato entro extract funzione, dal momento che non sarà disponibile sui lavoratori.

Se è necessario accedere contesto Spark dall'interno di calcolo distribuito, rdd.context funzione potrebbe essere utilizzata:

val rdd = sc.newAPIHadoopRDD(...) 
rdd map { 
    case (k, v) => 
    val ctx = rdd.context 
    .... 
} 
+0

Grazie !!! Non l'avrei mai scoperto! – Andrea

+1

quando faccio riferimento al contesto all'interno della chiusura, ricevo NotSerilizableException? Devo usare un contesto all'interno di una chiusura per accedere a un file su HDFS. Come può essere realizzato? – VishAmdi

Problemi correlati