2015-05-13 25 views
26

Sto cercando di capire la nuova API dataframe in Spark. sembra un buon passo in avanti, ma avendo problemi a fare qualcosa che dovrebbe essere piuttosto semplice. Ho un dataframe con 2 colonne, "ID" e "Quantità". Come esempio generico, dire che voglio restituire una nuova colonna chiamata "codice" che restituisce un codice basato sul valore di "Amt". Riesco a scrivere una cosa del genere functiin:Crea nuova colonna con funzione in Spark Dataframe

def coder(myAmt:Integer):String { 
    if (myAmt > 100) "Little" 
    else "Big" 
} 

Quando provo ad usare in questo modo:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet") 

myDF.withColumn("Code", coder(myDF("Amt"))) 

ottengo tipo errori di mismatch

found : org.apache.spark.sql.Column 
required: Integer 

Ho provato a cambiare la inserisci il tipo sulla mia funzione su org.apache.spark.sql.Column ma poi comincio a ottenere wrrors con la funzione di compilazione perché vuole un valore booleano nell'istruzione if.

Sto sbagliando? C'è un modo migliore/diverso per farlo rispetto all'utilizzo di Colonna?

Grazie per il vostro aiuto.

+0

'myDF.printSchema' preghiamo, vediamo la struttura della tabella di file. –

risposta

42

Diciamo che avete colonna "Amt" nello schema:

import org.apache.spark.sql.functions._ 
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet") 
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"} 
val sqlfunc = udf(coder) 
myDF.withColumn("Code", sqlfunc(col("Amt"))) 

Penso withColumn è il modo giusto per aggiungere una colonna

+2

Non ho mai visto una scrittura scritta in scala come hai fatto sopra. Per estensione, se avessi una funzione più complessa con più argomenti, dovrei scrivere qualcosa come: val coder: ((Int, Int) => String) = (arg1: Int, arg2: Int) => {if (arg1 < 100 && arg2 <100 ....? –

+1

@JCalbreath, è una funzione letterale in scala, vedere questo: http://stackoverflow.com/questions/5241147/what-is-a-function-literal-in-scala –

+0

Grazie! Molto utile! –

6

Dovremmo evitare di definire udf funzioni il più possibile a causa di il suo overhead di serialization e deserialization di colonne.

È possibile ottenere la soluzione con semplice funzione when scintilla, come di seguito

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet") 

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big")) 
Problemi correlati