2016-06-09 10 views
8

Mi chiedevo se esiste un modo per specificare una funzione di aggregazione personalizzata per i frame di dati spark su più colonne.Aggregazione di più colonne con funzione personalizzata nella scintilla

Ho una tabella come questa del tipo (nome, articolo, prezzo):

john | tomato | 1.99 
john | carrot | 0.45 
bill | apple | 0.99 
john | banana | 1.29 
bill | taco | 2.59 

a:

vorrei aggregare la voce ed è il costo per ogni persona in un elenco In questo modo:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29) 
bill | (apple, 0.99), (taco, 2.59) 

E 'possibile nei dataframes? Recentemente ho appreso su collect_list ma sembra funzionare solo per una colonna.

risposta

15

Il modo più semplice per farlo come un DataFrame è quello di raccogliere prime due liste, e quindi utilizzare un UDF a zip le due liste insieme. Qualcosa di simile:

import org.apache.spark.sql.functions.{collect_list, udf} 
import sqlContext.implicits._ 

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_)) 

val df = Seq(
    ("john", "tomato", 1.99), 
    ("john", "carrot", 0.45), 
    ("bill", "apple", 0.99), 
    ("john", "banana", 1.29), 
    ("bill", "taco", 2.59) 
).toDF("name", "food", "price") 

val df2 = df.groupBy("name").agg(
    collect_list(col("food")) as "food", 
    collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price") 

df2.show(false) 
# +----+---------------------------------------------+ 
# |name|food           | 
# +----+---------------------------------------------+ 
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| 
# |bill|[[apple,0.99], [taco,2.59]]     | 
# +----+---------------------------------------------+ 
+0

Buona risposta! :) – eliasah

+1

Ho usato 'col (...)' invece di '$" ... "' per una ragione - trovo 'col (...)' funziona con meno lavoro all'interno di cose come le definizioni di 'class' . –

+0

Esiste una funzione per riallineare le colonne come ad esempio nella funzione zip dirlo di aggiungere prima un elemento dalla coda della colonna e rimuoverne uno dalla testa e poi comprimerlo? In questo caso puoi avere ad esempio il prossimo prezzo per gli articoli se leggi i prezzi ogni giorno e c'è una colonna temporale. –

2

Questa è un'opzione convertendo il frame di dati in un RDD di Map e quindi chiamando uno groupByKey su di esso. Il risultato sarebbe un elenco di coppie chiave-valore in cui il valore è un elenco di tuple.

df.show 
+----+------+----+ 
| _1| _2| _3| 
+----+------+----+ 
|john|tomato|1.99| 
|john|carrot|0.45| 
|bill| apple|0.99| 
|john|banana|1.29| 
|bill| taco|2.59| 
+----+------+----+ 


val tuples = df.map(row => row(0) -> (row(1), row(2))) 
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43 

tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect 
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29)))) 
15

Si consiglia di utilizzare la funzione struct per raggruppare le colonne insieme prima di raccogliere in un elenco:

import org.apache.spark.sql.functions.{collect_list, struct} 
import sqlContext.implicits._ 

val df = Seq(
    ("john", "tomato", 1.99), 
    ("john", "carrot", 0.45), 
    ("bill", "apple", 0.99), 
    ("john", "banana", 1.29), 
    ("bill", "taco", 2.59) 
).toDF("name", "food", "price") 

df.groupBy($"name") 
    .agg(collect_list(struct($"food", $"price")).as("foods")) 
    .show(false) 

Uscite:

+----+---------------------------------------------+ 
|name|foods          | 
+----+---------------------------------------------+ 
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| 
|bill|[[apple,0.99], [taco,2.59]]     | 
+----+---------------------------------------------+ 
+0

Voglio dire che questo approccio sembra più pulito di quello accettato rispondi, ma sfortunatamente non funziona con spark 1.6, perché 'collect_list()' non accetta una struct. – trudolf

Problemi correlati