2016-02-22 17 views
8

Voglio analizzare le colonne della data in un DataFrame e, per ciascuna colonna della data, la risoluzione per la data potrebbe cambiare (ad esempio 2011/01/10 => 2011/01 se la risoluzione è impostata su "Mese").Come posso passare parametri extra alle UDF in SparkSql?

ho scritto il seguente codice:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = 
{ 
    import org.apache.spark.sql.functions._ 
    val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} 
    val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} 

    val allColNames = dataframe.columns 
    val allCols = allColNames.map(name => dataframe.col(name)) 

    val mappedCols = 
    { 
    for(i <- allCols.indices) yield 
    { 
     schema(i) match 
     { 
     case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i))) 
     case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) 
     case _ => allCols(i) 
     } 
    } 
    } 

    dataframe.select(mappedCols:_*) 

}} 

Tuttavia non funziona. Sembra che io possa solo passare Column s alle UDF. E mi chiedo se sarà molto lento se converto il DataFrame in RDD e applichiamo la funzione su ogni riga.

Qualcuno conosce la soluzione corretta? Grazie!

risposta

25

basta usare un po 'di accattivarsi:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
    SparkDateTimeConverter.convertDate(x, resolution)) 

e usarlo come segue:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i)) 

Su un lato nota si dovrebbe dare un'occhiata a sql.functions.trunc e sql.functions.date_format. Questi dovrebbero almeno parte del lavoro senza utilizzare le UDF.

Nota:

In Spark 2.2 o versioni successive è possibile utilizzare la funzione typedLit:

import org.apache.spark.sql.functions.typedLit 

che supportano una gamma più ampia di letterali come Seq o Map.

+1

Grazie per la risposta e l'intuizione di accattivarsi! – DarkZero

+4

Ho scritto un tutorial su come utilizzare il currying per creare Spark UDF che accetta parametri aggiuntivi al momento dell'invocazione. https://gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 –

10

È possibile creare un letterale Column per passare ad un'UDF utilizzando la funzione lit(...) definito org.apache.spark.sql.functions

Ad esempio:

val takeRight = udf((s: String, i: Int) => s.takeRight(i)) 
df.select(takeRight($"stringCol", lit(1))) 
+1

Grazie, inizialmente ho usato anche 'lit', ma si scopre che le sue prestazioni non sono buone come l'altra risposta ... – DarkZero

Problemi correlati