2016-01-22 12 views
13

Ho tre array di tipo stringa contenente le seguenti informazioni:più operazioni di aggregazione sulla stessa colonna di un dataframe scintilla

  • groupBy matrice: contenenti i nomi delle colonne che voglio gruppo miei dati da parte.
  • array aggregato: contenente i nomi delle colonne che voglio aggregare.
  • operazioni matrice: contenente le operazioni di aggregazione che voglio eseguire

Sto cercando di usare i frame di dati scintilla per raggiungere questo obiettivo. I frame di dati Spark forniscono un agg() in cui è possibile passare una Mappa [String, String] (di nome colonna e rispettiva operazione di aggregazione) come input, tuttavia voglio eseguire diverse operazioni di aggregazione sulla stessa colonna dei dati. Qualche suggerimento su come ottenere questo?

risposta

26

Scala:

È possibile ad esempio mappa più di un elenco di funzioni con un definito mapping dal nome in funzione di:

import org.apache.spark.sql.functions.{col, min, max, mean} 
import org.apache.spark.sql.Column 

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v") 
val mapping: Map[String, Column => Column] = Map(
    "min" -> min, "max" -> max, "mean" -> avg) 

val groupBy = Seq("k") 
val aggregate = Seq("v") 
val operations = Seq("min", "max", "mean") 
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c)))) 

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show 
// +---+------+------+------+ 
// | k|min(v)|max(v)|avg(v)| 
// +---+------+------+------+ 
// | 1| 3.0| 3.0| 3.0| 
// | 2| -5.0| -5.0| -5.0| 
// +---+------+------+------+ 

o

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show 

Purtroppo parser che è usato internamente SQLContext non è esposto pubblicamente ma puoi sempre provare a costruire SQL semplice q ueries:

df.registerTempTable("df") 
val groupExprs = groupBy.mkString(",") 
val aggExprs = aggregate.flatMap(c => operations.map(
    f => s"$f($c) AS ${c}_${f}") 
).mkString(",") 

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs") 

Python:

from pyspark.sql.functions import mean, sum, max, col 

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"]) 
groupBy = ["k"] 
aggregate = ["v"] 
funs = [mean, sum, max] 

exprs = [f(col(c)) for f in funs for c in aggregate] 

# or equivalent df.groupby(groupBy).agg(*exprs) 
df.groupby(*groupBy).agg(*exprs) 
+0

Questo funziona molto bene. Grazie mille. :) –

+0

@ zero323 Sai per caso come fare con l'API python? – lanenok

+0

@lanenok Più o meno allo stesso modo. Basta sostituire flatMap con le comprensioni. – zero323

Problemi correlati