2015-11-12 9 views
5

Ho difficoltà durante il lavoro con i frame di dati in spark con Scala. Se ho un frame di dati che voglio estrarre una colonna di voci univoche, quando uso groupBy non ottengo un frame di dati indietro.Uso di groupBy in Spark e ritorno a un DataFrame

Per esempio, ho un DataFrame chiamati registri che ha la seguente forma:

machine_id | event  | other_stuff 
34131231 | thing  | stuff 
83423984 | notathing | notstuff 
34131231 | thing | morestuff 

e vorrei gli ID macchina unica dove l'evento è cosa salva in una nuova DataFrame per permettermi di fare un po 'di filtraggio di qualche tipo. Utilizzando

val machineId = logs 
    .where($"event" === "thing") 
    .select("machine_id") 
    .groupBy("machine_id") 

ho un val di dati raggruppati di nuovo che è un dolore nel culo per utilizzare (o non so come usare questo tipo di oggetto correttamente). Avendo ottenuto questo elenco di ID macchina univoci, voglio quindi usarlo nel filtrare un altro DataFrame per estrarre tutti gli eventi per i singoli ID macchina.

posso vedere io voglio fare questo genere di cose abbastanza regolarmente e il flusso di lavoro di base è: id

  1. estratto uniche da una tabella di log.
  2. Utilizzare ID univoci per estrarre tutti gli eventi per un determinato ID.
  3. Utilizzare alcuni tipi di analisi su questi dati che sono stati estratti.

Sono i primi due gradini che apprezzerei un po 'di guida qui.

Apprezzo che questo esempio sia un po 'forzato ma spero che spieghi qual è il mio problema. Può darsi che non ne sappia abbastanza sugli oggetti GroupedData o (come spero) mi manca qualcosa nei frame di dati che lo rende facile. Sto usando la scintilla 1.5 costruita su Scala 2.10.4.

Grazie

risposta

7

Basta usare distinct non groupBy:

val machineId = logs.where($"event"==="thing").select("machine_id").distinct 

Quale sarà equivalente a SQL:

SELECT DISTINCT machine_id FROM logs WHERE event = 'thing' 

GroupedData non è destinato ad essere utilizzato direttamente. Fornisce una serie di metodi, in cui agg è il più generale, che può essere utilizzato per applicare diverse funzioni di aggregazione e convertirlo nuovamente in DataFrame. In termini di SQL quello che hai dopo where e groupBy equivale a qualcosa di simile

SELECT machine_id, ... FROM logs WHERE event = 'thing' GROUP BY machine_id 

dove ... deve essere fornito da agg o un metodo equivalente.

1

Un gruppo da in scintilla seguito dall'aggregazione e quindi un'istruzione select restituirà un frame di dati. Per il tuo esempio dovrebbe essere qualcosa del tipo:

val machineId = logs 
    .groupBy("machine_id", "event") 
    .agg(max("other_stuff")) 
    .select($"machine_id").where($"event" === "thing") 
Problemi correlati