2016-02-07 20 views
13

Ho due dataframes con le seguenti colonne:Spark colonne duplicate in dataframe dopo si uniscono

df1.columns 
// Array(ts, id, X1, X2) 

e

df2.columns 
// Array(ts, id, Y1, Y2) 

Dopo faccio

val df_combined = df1.join(df2, Seq(ts,id)) 

io alla fine con il seguente colonne: Array(ts, id, X1, X2, ts, id, Y1, Y2). Potrei aspettarmi che le colonne comuni vengano eliminate. C'è qualcosa di aggiuntivo che deve essere fatto?

+0

Se si definiscono le colonne di join come un 'Seq' di stringhe (per i nomi di colonne), le colonne non devono essere duplicate. Vedi la mia risposta qui sotto. – stackoverflowuser2010

risposta

12

Questo è un comportamento previsto. DataFrame.join metodo è equivalente a SQL unirsi come questo

SELECT * FROM a JOIN b ON joinExprs 

Se si desidera ignorare le colonne duplicate solo farli cadere o selezionare le colonne di interesse in seguito. Se si desidera disambiguare è possibile utilizzare l'accesso di questi utilizzando genitore DataFrames:

val a: DataFrame = ??? 
val b: DataFrame = ??? 
val joinExprs: Column = ??? 

a.join(b, joinExprs).select(a("id"), b("foo")) 
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo")) 

o utilizzare pseudonimi:

// As for now aliases don't work with drop 
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo") 
+0

Invece di selezionare, posso rilasciare la colonna duplicata? – Neel

+0

Sì, ma solo tramite genitori non con alias. – zero323

+0

Che ne dici di un join esterno? Qualsiasi riga senza corrispondenza avrà un valore nullo in una delle colonne chiave della tabella, ma non si sa in anticipo quale deve essere eliminato. C'è un modo per gestire elegantemente questo caso? – Darryl

2

sono stato bloccato con questo per un po ', e solo di recente mi si avvicinò con un soluzione ciò che è abbastanza facile.

Dire a è

scala> val a = Seq(("a", 1), ("b", 2)).toDF("key", "vala") 
a: org.apache.spark.sql.DataFrame = [key: string, vala: int] 

scala> a.show 
+---+----+ 
|key|vala| 
+---+----+ 
| a| 1| 
| b| 2| 
+---+----+ 
and 
scala> val b = Seq(("a", 1)).toDF("key", "valb") 
b: org.apache.spark.sql.DataFrame = [key: string, valb: int] 

scala> b.show 
+---+----+ 
|key|valb| 
+---+----+ 
| a| 1| 
+---+----+ 

e posso fare questo per selezionare solo il valore in dataframe un:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show 
+---+----+ 
|key|vala| 
+---+----+ 
| a| 1| 
| b| 2| 
+---+----+ 
15

La semplice risposta (dal Databricks FAQ on this matter) è di effettuare il join in cui il le colonne unite sono espresse come una serie di stringhe (o una stringa) anziché un predicato.

Di seguito è riportato un esempio tratto dalle Domande frequenti sui Databricks ma con due colonne di join per rispondere alla domanda del poster originale.

Ecco la sinistra dataframe:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10)) 

val left = llist.toDF("firstname","lastname","date","duration") 

left.show() 

/* 
+---------+--------+----------+--------+ 
|firstname|lastname|  date|duration| 
+---------+--------+----------+--------+ 
|  bob|  b|2015-01-13|  4| 
| alice|  a|2015-04-23|  10| 
+---------+--------+----------+--------+ 
*/ 

Ecco la destra dataframe:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload") 

right.show() 

/* 
+---------+--------+------+ 
|firstname|lastname|upload| 
+---------+--------+------+ 
| alice|  a| 100| 
|  bob|  b| 23| 
+---------+--------+------+ 
*/ 

Ecco un errato soluzione , dove le colonne di join sono definiti come la predicato left("firstname")===right("firstname") && left("lastname")===right("lastname").

Il risultato errato è che le firstname e lastname colonne sono duplicati nella cornice dati unita:

left.join(right, left("firstname")===right("firstname") && 
       left("lastname")===right("lastname")).show 

/* 
+---------+--------+----------+--------+---------+--------+------+ 
|firstname|lastname|  date|duration|firstname|lastname|upload| 
+---------+--------+----------+--------+---------+--------+------+ 
|  bob|  b|2015-01-13|  4|  bob|  b| 23| 
| alice|  a|2015-04-23|  10| alice|  a| 100| 
+---------+--------+----------+--------+---------+--------+------+ 
*/ 

la correttezza soluzione è definire le colonne aderire come un array di stringhe Seq("firstname", "lastname").colonne La cornice di dati di output non dispone di duplicati:

left.join(right, Seq("firstname", "lastname")).show 

/* 
+---------+--------+----------+--------+------+ 
|firstname|lastname|  date|duration|upload| 
+---------+--------+----------+--------+------+ 
|  bob|  b|2015-01-13|  4| 23| 
| alice|  a|2015-04-23|  10| 100| 
+---------+--------+----------+--------+------+ 
*/ 
+3

in realtà l'output DF * ha * duplicati usando quanto segue; 'val join = sampledDF.join (idsDF, idColumns," inner ")'. e dove 'idColumns' è una Seq [String] contenente le colonne join – javadba

0

Questo è un comportamento normale da SQL, quello che sto facendo per questo:

  • goccia o rinominare le colonne di origine
  • Do il join
  • goccia rinominato colonna eventuale

Qui sto sostituendo colonna "fullname":

Alcuni di codice in Java:

this 
    .sqlContext 
    .read() 
    .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day)) 
    .drop("fullname") 
    .registerTempTable("data_original"); 

this 
    .sqlContext 
    .read() 
    .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day)) 
    .registerTempTable("data_v2"); 

this 
    .sqlContext 
    .sql(etlQuery) 
    .repartition(1) 
    .write() 
    .mode(SaveMode.Overwrite) 
    .parquet(outputPath); 

Se la query è:

SELECT 
    d.*, 
    concat_ws('_', product_name, product_module, name) AS fullname 
FROM 
    {table_source} d 
LEFT OUTER JOIN 
    {table_updates} u ON u.id = d.id 

Questo è qualcosa che si può fare solo con Spark Credo (eliminare la colonna dalla lista), molto molto utile!

Problemi correlati