2015-08-03 21 views
9

Ho registri utente che ho preso da un csv e convertito in un DataFrame per sfruttare le funzionalità di query di SparkSQL. Un singolo utente creerà numerose voci all'ora e vorrei raccogliere alcune informazioni statistiche di base per ciascun utente; in realtà solo il conteggio delle istanze utente, la media e la deviazione standard di numerose colonne. Sono stato in grado di ottenere rapidamente la media e contare le informazioni utilizzando groupBy ($ "utente") e l'aggregatore di funzioni SparkSQL per il conteggio e AVG:Calcola la deviazione standard dei dati raggruppati in Spark DataFrame

val meanData = selectedData.groupBy($"user").agg(count($"logOn"), 
avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"), 
avg($"repliesPerHour"), avg($"duration")) 

Tuttavia, non riesco a trovare un modo altrettanto elegante per calcolare la deviazione standard. Finora posso calcolare solo mappando una stringa, doppia coppia e utilizzare StatCounter() stdev utility:.

val stdevduration = duration.groupByKey().mapValues(value => 
org.apache.spark.util.StatCounter(value).stdev) 

Ciò restituisce un RDD però, e vorrei cercare di mantenere tutto in un dataframe per ulteriori query possibili sui dati restituiti.

risposta

29

Spark 1.6+

È possibile utilizzare stddev_pop per calcolare deviazione standard della popolazione e stddev/stddev_samp di calcolare imparziale deviazione standard del campione:

import org.apache.spark.sql.functions.{stddev_samp, stddev_pop} 

selectedData.groupBy($"user").agg(stdev_pop($"duration")) 

Spark 1.5 e al di sotto (L'originale risposta):

Non è così bella e di parte (uguale al valore restituito da describe), ma usando la formula:

wikipedia sdev

si può fare qualcosa di simile:

import org.apache.spark.sql.functions.sqrt 

selectedData 
    .groupBy($"user") 
    .agg((sqrt(
     avg($"duration" * $"duration") - 
     avg($"duration") * avg($"duration") 
    )).alias("duration_sd")) 

È possibile, naturalmente, creare una funzione di ridurre l'ingombro:

import org.apache.spark.sql.Column 
def mySd(col: Column): Column = { 
    sqrt(avg(col * col) - avg(col) * avg(col)) 
} 

df.groupBy($"user").agg(mySd($"duration").alias("duration_sd")) 

E 'anche possibile utilizzare Hive UDF:

df.registerTempTable("df") 
sqlContext.sql("""SELECT user, stddev(duration) 
        FROM df 
        GROUP BY user""") 

Fonte dell'immagine: https://en.wikipedia.org/wiki/Standard_deviation

+2

funziona perfettamente! Grazie mille per l'ottima risposta e per l'aggiunta alla chiamata della funzione alias. – the3rdNotch

+0

errore di battitura super: stdev -> stddev – Jesse

+0

Ci sono tre funzioni nelle funzioni di SparkSQL: stddev, stddev_samp, stddev_pop. Quindi c'è più bisogno di un'implementazione personalizzata? –

Problemi correlati