2016-04-12 17 views
13

Come un esempio semplificato, ho un dataframe "df" con colonne "col1, col2" e voglio calcolare una riga massima-saggio dopo l'applicazione di una funzione di ciascuna colonna:PySpark row-wise funzione composizione

def f(x): 
    return (x+1) 

max_udf=udf(lambda x,y: max(x,y), IntegerType()) 
f_udf=udf(f, IntegerType()) 

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2))) 

Quindi, se df:

col1 col2 
1  2 
3  0 

Poi

DF2:

col1 col2 result 
1  2  3 
3  0  4 

È possibile che questo non sembra funzionare e produce "Impossibile valutare l'espressione: PythonUDF # f ..."

Sono assolutamente positivo "f_udf" funziona bene sul mio tavolo, e il problema principale è con il max_udf.

Senza creare colonne aggiuntive o utilizzare la mappa di base/riduci, c'è un modo per fare tutto quanto sopra usando i dataframes e udfs? Come dovrei modificare "max_udf"?

Ho anche provato:

max_udf=udf(max, IntegerType()) 

che produce lo stesso errore.

Ho anche confermato che le seguenti opere:

df2=(df.withColumn("temp1", f_udf(df.col1)) 
     .withColumn("temp2", f_udf(df.col2)) 

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2)) 

Perché è che io non posso fare queste in una volta sola?

Mi piacerebbe vedere una risposta che generalizza a qualsiasi funzione "f_udf" e "max_udf".

risposta

21

Ho avuto un problema simile e trovato la soluzione nella risposta alla this stackoverflow question

Per passare più colonne o un intero riga per un UDF utilizzare un struct:

from pyspark.sql.functions import udf, struct 
from pyspark.sql.types import IntegerType 

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b")) 

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType()) 

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns]))) 

new_df.show() 

ritorni:

+----+----+----------+ 
| a| b|null_count| 
+----+----+----------+ 
|null|null|   2| 
| 1|null|   1| 
|null| 2|   1| 
+----+----+----------+ 
+0

Grazie, questa è la prima vera risposta a questa domanda! –

+0

@AlexR. - Se sei soddisfatto di questa risposta, per favore accettala! – proinsias

7

UserDefinedFunction genera un errore mentre accetta le UDF come argomenti.

È possibile modificare il max_udf come sotto per farlo funzionare.

df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"]) 

max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType()) 

df2 = df.withColumn("result", max_udf(df.col1, df.col2)) 

O

def f_udf(x): 
    return (x + 1) 

max_udf = udf(lambda x, y: max(x, y), IntegerType()) 
## f_udf=udf(f, IntegerType()) 

df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2))) 

Nota:

Il secondo approccio è valida se e solo se le funzioni interne (qui f_udf) generano espressioni SQL valide.

Funziona qui perché f_udf(df.col1) e f_udf(df.col2) sono valutati come Column<b'(col1 + 1)'> e Column<b'(col2 + 1)'>, rispettivamente, prima di essere passato a max_udf. Non funzionerebbe con la funzione arbitraria.

Non funzionerebbe se proviamo per esempio qualcosa di simile:

from math import exp 

df.withColumn("result", max_udf(exp(df.col1), exp(df.col2))) 
+0

Grazie per la risposta! Potresti chiarire il secondo approccio? Sono confuso da come non hai bisogno di f_udf per essere un UDF valido per applicarlo alla colonna del frame dei dati? –

+0

Anche la seconda risposta sembra sfruttare il fatto che le colonne del dataframe rispondono alle operazioni "+". C'è qualcosa che generalizza questo ad altri "f_udf"? In generale, se ho un numero di diverse funzioni "f_udf", dovrei scrivere un set separato di funzioni max_udf per ognuna? –

+0

Mi dispiace, sono anche nuovo per accendere. Ho notato che posso eseguire operazioni su colonne con normali funzioni senza convertirle in UDF. Puoi sollevarlo come una domanda separata? Ho bisogno di sapere anche l'an – Mohan