2015-09-18 20 views
6

Sto cercando di filtrare un dataframe contro un altro:Come filtrare una dataframe scintilla contro un altro dataframe

scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id") 
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id") 

Ora voglio filtrare DF1 e tornare un dataframe che contiene tutte le righe in df1 dove user_id è in df2 ("valid_id"). In altre parole, voglio che tutte le righe in cui il df1 id_utente è o 2,3,4,5 o 6

scala> df1.select("user_id").filter($"user_id" in df2("valid_id")) 
warning: there were 1 deprecation warning(s); re-run with -deprecation for details 
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20); 

D'altra parte, quando provo a fare un filtro contro una funzione, tutto sembra grande :

scala> df1.select("user_id").filter(($"user_id" % 2) === 0) 
res1: org.apache.spark.sql.DataFrame = [user_id: int] 

Perché viene visualizzato questo errore? C'è qualcosa di sbagliato nella mia sintassi?

seguente commento che ho cercato di fare un join esterno sinistro:

scala> df1.show 
+-------+------------------+-------+ 
| name|    score|user_id| 
+-------+------------------+-------+ 
| user 1|    0.123|  1| 
| user 2|    0.246|  2| 
| user 3|    0.369|  3| 
| user 4|    0.492|  4| 
| user 5|    0.615|  5| 
| user 6|    0.738|  6| 
| user 7|    0.861|  7| 
| user 8|    0.984|  8| 
| user 9|    1.107|  9| 
|user 10|    1.23|  10| 
|user 11|    1.353|  11| 
|user 12|    1.476|  12| 
|user 13|    1.599|  13| 
|user 14|    1.722|  14| 
|user 15|    1.845|  15| 
|user 16|    1.968|  16| 
|user 17|    2.091|  17| 
|user 18|    2.214|  18| 
|user 19|2.3369999999999997|  19| 
|user 20|    2.46|  20| 
+-------+------------------+-------+ 
only showing top 20 rows 

scala> df2.show 
+--------+ 
|valid_id| 
+--------+ 
|  2| 
|  3| 
|  4| 
|  5| 
|  6| 
+--------+ 

scala> df1.join(df2, df1("user_id") === df2("valid_id")) 
res6: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int] 
scala> res6.collect 
res7: Array[org.apache.spark.sql.Row] = Array() 

scala> df1.join(df2, df1("user_id") === df2("valid_id"), "left_outer") 
res8: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int] 
scala> res8.count 
res9: Long = 0 

Io corro scintilla 1.5.0 con scala 2.10.5

+0

Si desidera filtrare o eseguire una giunzione su due frame di dati? – eliasah

+0

@eliasah Voglio ottenere un dataframe con un sottoinsieme delle righe da df1. per ogni riga r in df1, se il valore di r ("user_id") è in df2 ("valid_id"), la riga r sarà inclusa nel dataframe del risultato. – polo

+0

Quindi dovrai eseguire un join esterno sinistro da df1 a df2 su userId == validId – eliasah

risposta

11

Volete un (normale) join interno, non un join esterno :)

df1.join(df2, df1("user_id") === df2("valid_id")) 
+0

Definitivamente! Scusa colpa mia! Ora so che non è una buona idea andare su SO con l'insonnia :) – eliasah

+0

@ glennie-helles-sindholt: Grazie per la tua risposta. Questo ha senso, ma restituisce un dataframe vuoto. Vedi le mie modifiche con un esempio di codice nella domanda. – polo

+0

@polo Devo dire che tutto sembra essere giusto, per quanto posso vedere. Ho appena copiato i tuoi comandi nella mia shell (anche con Spark 1.5.0) e tutto funziona perfettamente. Non hai per caso qualche esplicito 'val sc = new SparkContext (conf)' da qualche parte nella tua shell, vero? Recentemente ho inciampato in qualcun altro che ha visto uno strano comportamento perché aveva dichiarato la sua sc-variabile. Altrimenti, penso di essere appena uscito dalle idee perché semplicemente non riesco a riprodurre il problema. Immagino tu abbia provato a rilanciare la tua shell? –

Problemi correlati