2015-05-20 17 views
6

Ho un DataFrame su cui sto operando, e voglio raggruppare una serie di colonne e operare per gruppo sul resto delle colonne. Nel regolare RDD -land penso che sarebbe simile a questa:Spark DataFrame: operare sui gruppi

rdd.map(tup => ((tup._1, tup._2, tup._3), tup)). 
    groupByKey(). 
    forEachPartition(iter => doSomeJob(iter)) 

In DataFrame -land mi piacerebbe iniziare in questo modo:

df.groupBy("col1", "col2", "col3") // Reference by name 

ma poi non sono sicuro di come operare su i gruppi se le mie operazioni sono più complicate della media/min/max/conteggio offerte da GroupedData.

Per esempio, voglio costruire un unico documento MongoDB per ("col1", "col2", "col3") gruppo (scorrendo le associate s nel gruppo), ridimensionare a N partizioni, quindi inserire i documenti in un database MongoDB. Il limite N corrisponde al numero massimo di connessioni simultanee che desidero.

Qualche consiglio?

+2

Modo migliore: scrivere un UDAF (non ancora supportato, vedere SPARK-4233 e SPARK-3947). Fino ad allora, usa DF.RDD per accedere ai metodi RDD come aggregateByKey per ottenere ciò che vuoi creare –

risposta

1

È possibile eseguire un auto join. In primo luogo ottenere i gruppi:

val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3") 

Quindi è possibile aderire a questo nuovo al dataframe originale:

val joinedDF = groups 
    .select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3) 
    .join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3") 

Mentre questo si ottiene esattamente gli stessi dati si aveva in origine (e con 3 supplementari, colonne ridondanti) puoi fare un altro join per aggiungere una colonna con l'ID del documento MongoDB per il gruppo (col1, col2, col3) associato alla riga.

In ogni caso, nella mia esperienza, join e self-join sono il modo in cui gestisci le cose complicate in DataFrames.

+0

Non sono sicuro di come prenderlo da lì - cosa mi permetterebbe di scorrere tutte le '$" col4 "' & '$ "col5" 'valori associati a una particolare combinazione' ($ "col1", $ "col2", $ "col3") '? –

+0

Il modo in cui funzionano i DataFrames, hai solo due opzioni. O fai qualcosa come suArray sui gruppi, poi fai un foreach e all'interno dei loop create un DataFrame filtrando su cols1 a cols3. O devi fare tutto in un singolo DataFrame usando complicati join come stavo tentando di alludere a. –

Problemi correlati