2015-04-02 18 views
12

Ho un dataframe che include timestamp. Per aggregare in base al tempo (minuto, ora o giorno), ho provato come:Come utilizzare il valore costante in UDF di Spark SQL (DataFrame)

val toSegment = udf((timestamp: String) => { 
    val asLong = timestamp.toLong 
    asLong - asLong % 3600000 // period = 1 hour 
}) 

val df: DataFrame // the dataframe 
df.groupBy(toSegment($"timestamp")).count() 

Questo funziona correttamente.

mia domanda è come generalizzare l'UDF toSegment come

val toSegmentGeneralized = udf((timestamp: String, period: Int) => { 
    val asLong = timestamp.toLong 
    asLong - asLong % period 
}) 

Ho provato quanto segue ma non funziona

df.groupBy(toSegment($"timestamp", $"3600000")).count() 

sembra di trovare la colonna denominata 3600000.

La soluzione possibile è per utilizzare la colonna costante ma non sono riuscita a trovarla.

risposta

24

È possibile utilizzare org.apache.spark.sql.functions.lit() per creare la colonna costante:

import org.apache.spark.sql.functions._ 

df.groupBy(toSegment($"timestamp", lit(3600000))).count() 
+1

La funzione acceso grandi opere se si dispone di una stringa o INT a passare in fallisce miseramente con qualcosa come un Array/List.. Qualche idea su cosa fare lì? –

+0

Questo pacchetto ha anche una funzione chiamata 'array()' che potresti essere in grado di usare per combinare un gruppo di colonne letterali - non l'ho provato. Potrebbe non essere troppo difficile creare una funzione analoga per le liste, specialmente se si guarda all'implementazione di 'array()' in [functions.scala] (https://github.com/apache/spark/blob/master/ sql/core/src/main/scala/org/apache/spark/sql/functions.scala) - uno non sembra esistere. –

+0

Ora, avendo provato a usare 'array()', dovrei sottolineare che il parametro UDF corrispondente deve essere di tipo 'ArrayBuffer [T]' per alcuni 'T'. –

Problemi correlati