2015-07-16 14 views

risposta

69

Con SQL prime è possibile utilizzare CONCAT:

  • In Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) 
    df.registerTempTable("df") 
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
    
  • In Scala

    import sqlContext.implicits._ 
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") 
    df.registerTempTable("df") 
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
    

Dal momento che Spark 1.5.0 è possibile utilizzare concat funzione wit h dataframe API:

  • In Python:

    from pyspark.sql.functions import concat, col, lit 
    
    df.select(concat(col("k"), lit(" "), col("v"))) 
    
  • In Scala:

    import org.apache.spark.sql.functions.{concat, lit} 
    
    df.select(concat($"k", lit(" "), $"v")) 
    

C'è anche concat_ws funzione che prende un separatore stringa come primo argomento.

+0

Cosa succede se dataFrame ha valore nullo? come questo df = sqlContext.createDataFrame ([("foo", 1), ("bar", 2), ("check", null)], ("k", "v")) –

+1

@TarunKumar Vuoi dire qualcosa come [questo] (http://stackoverflow.com/a/33152113/1560062)? – zero323

+0

questo è quello che volevo. grazie –

14

Se si desidera utilizzare DF, è possibile utilizzare un udf per aggiungere una nuova colonna in base alle colonne esistenti.

val sqlContext = new SQLContext(sc) 
case class MyDf(col1: String, col2: String) 

//here is our dataframe 
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F")) 
)) 

//Define a udf to concatenate two passed in string values 
val getConcatenated = udf((first: String, second: String) => { first + " " + second }) 

//use withColumn method to add a new column called newColName 
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show() 
+0

C'è un modo per concatenare dinamicamente le colonne da una stringa di input? – ashK

+0

Questo non è ottimale, rispetto a DataFrame.concat_ws, poiché Spark non ottimizza molto bene udfs/affatto. Naturalmente, nel momento in cui hai bisogno di una logica personalizzata nella tua concatenazione, non sarai in grado di evitare l'udf. –

4

Ecco un altro modo di fare questo per pyspark:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit 

#Create your data frame 
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa']) 

#Use select, concat, and lit functions to do the concatenation 
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African')) 

#Show the new data frame 
personDF.show() 

----------RESULT------------------------- 

84 
+------------+ 
|East African| 
+------------+ 
| Ethiopian| 
|  Kenyan| 
|  Ugandan| 
|  Rwandan| 
+------------+ 
0

Un altro modo per farlo in pySpark usando SqlContext ...

#Suppose we have a dataframe: 
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2']) 

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname')) 
13

Ecco come si può fare denominazione personalizzata

import pyspark 
from pyspark.sql import functions as sf 
sc = pyspark.SparkContext() 
sqlc = pyspark.SQLContext(sc) 
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2']) 
df.show() 

dà,

+--------+--------+ 
|colname1|colname2| 
+--------+--------+ 
| row11| row12| 
| row21| row22| 
+--------+--------+ 

creare nuova colonna concatenando:

df = df.withColumn('joined_column', 
        sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2'))) 
df.show() 

+--------+--------+-------------+ 
|colname1|colname2|joined_column| 
+--------+--------+-------------+ 
| row11| row12| row11_row12| 
| row21| row22| row21_row22| 
+--------+--------+-------------+ 
+0

Perché chiami 'sf.lit ('_')' e non solo ''_''? –

+2

'lit' crea una colonna di' _' – muon

3

Ecco un suggerimento per quando non si conosce il numero o il nome delle colonne nel dataframe.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*)) 
Problemi correlati