15

Sto usando pyspark, caricando un file csv di grandi dimensioni in un dataframe con spark-csv, e come fase di pre-elaborazione ho bisogno di applicare una varietà di operazioni ai dati disponibili in una delle colonne (che contiene una stringa json). Ciò restituirà i valori X, ognuno dei quali deve essere memorizzato nella propria colonna separata.Apache Spark - Assegna il risultato di UDF a più colonne del dataframe

Tale funzionalità verrà implementata in una UDF. Tuttavia, non sono sicuro di come restituire un elenco di valori da quell'UDF e di inviarli in singole colonne. Ecco un semplice esempio:

(...) 
from pyspark.sql.functions import udf 
def udf_test(n): 
    return [n/2, n%2] 

test_udf=udf(udf_test) 


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4) 

Ciò produce i seguenti:

+------+----------+--------------------+ 
|amount|trans_date|    test| 
+------+----------+--------------------+ 
| 28.0|2016-02-07|   [14.0, 0.0]| 
| 31.01|2016-02-07|[15.5050001144409...| 
| 13.41|2016-02-04|[6.70499992370605...| 
| 307.7|2015-02-17|[153.850006103515...| 
| 22.09|2016-02-05|[11.0450000762939...| 
+------+----------+--------------------+ 
only showing top 5 rows 

quale sarebbe il modo migliore per conservare i due (in questo esempio) valori essendo restituito dal UDF su colonne separate? In questo momento essi vengono digitati come stringhe:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema() 

root 
|-- amount: float (nullable = true) 
|-- trans_date: string (nullable = true) 
|-- test: string (nullable = true) 

risposta

25

Non è possibile creare più colonne di alto livello da una singola chiamata UDF ma è possibile creare un nuovo struct. Richiede un'UDF con specificato returnType:

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("foo", FloatType(), False), 
    StructField("bar", FloatType(), False) 
]) 

def udf_test(n): 
    return (n/2, n % 2) if n and n != 0.0 else (float('nan'), float('nan')) 

test_udf = udf(udf_test, schema) 
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) 

foobars = df.select(test_udf("y").alias("foobar")) 
foobars.printSchema() 
## root 
## |-- foobar: struct (nullable = true) 
## | |-- foo: float (nullable = false) 
## | |-- bar: float (nullable = false) 

È appiattire ulteriormente lo schema con semplice select:

foobars.select("foobar.foo", "foobar.bar").show() 
## +---+---+ 
## |foo|bar| 
## +---+---+ 
## |1.0|0.0| 
## |1.5|1.0| 
## +---+---+ 

Vedi anche Derive multiple columns from a single column in a Spark DataFrame

+0

Fantastico! Funziona molto bene per quello di cui avevo bisogno. Ero quasi tutto lì, ma inserivo erroneamente lo schema StructType in udf, il che stava causando la mia nuova colonna come StringType. Grazie mille! –

+0

Grazie !! Questo era esattamente quello che stavo cercando. :) – dksahuji

Problemi correlati