2015-09-08 15 views
13

ho principale che crea contesto scintilla:Spark sql dataframe - Import sqlContext.implicits._

val sc = new SparkContext(sparkConf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

Poi crea dataframe e fa filtri e convalide sulla dataframe.

val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00") 

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0)) 
    // record length cannot be < 2 
    .na.drop(3) 
    // round to hours 
    .withColumn("time",convertToHourly($"time")) 

Questo funziona benissimo.

ma quando provo a muovere le convalide in un altro file inviando il dataframe a

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 

che ottiene il dataframe & fa le convalide e trasformazioni: sembra che ho bisogno del

import sqlContext.implicits._ 

To avoid the error: “value $ is not a member of StringContext” that happens on line: .withColumn("time",convertToHourly($"time"))

Ma per utilizzare il import sqlContext.implicits._ Ho anche bisogno del sqlContext definito nel nuovo file in questo modo:

val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

o inviarlo al

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 
function 

mi sento come la separazione che sto cercando di fare per 2 file (principale & convalida) non è fatto correttamente ...

Qualche idea su come progettare questo? O semplicemente inviare sqlContext alla funzione?

Grazie!

+0

Quando voglio separare le cose come che ho appena passo SqlContext nel costruttore della nuova classe e poi importare sqlContext.implicits._ una volta per ogni classe. Non potrei inventarmi nulla di meglio quindi voto questa domanda e aspetto migliori suggerimenti. – Niemand

risposta

11

È possibile utilizzare un'istanza singleton di SQLContext. Si può dare un'occhiata a questo esempio nel spark repository

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 

    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 
... 
//And wherever you want you can do 
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
import sqlContext.implicits._ 
+1

Grazie! Ho usato l'oggetto Singleton, ma nel mio caso lo voglio creato una sola volta così ha fatto: oggetto SQLContextSingleton { @transient var instance: SQLContext = _ }, quindi inizializzato da main, e usato su validazioni. Grazie per l'aiuto! –

Problemi correlati