2016-04-25 9 views
7

Tutto il data types in pyspark.sql.types are:Come restituire un "tipo di tupla" in una UDF in PySpark?

__all__ = [ 
    "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", 
    "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", 
    "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"] 

devo scrivere un UDF (in pyspark), che restituisce un array di tuple. Cosa posso dare al secondo argomento che è il tipo di ritorno del metodo udf? Sarebbe qualcosa sulle linee di ArrayType(TupleType()) ...

+0

La domanda del titolo non sembra corrispondere al corpo. La documentazione non spiega come impostare un valore di ritorno come * "tipo di contenitore di altro tipo" *? – jonrsharpe

+0

@jonrsharpe Ho cambiato il titolo. Speriamo che sia rappresentativo del corpo ora. – kamalbanga

risposta

11

Non esiste una cosa come TupleType in Spark. I tipi di prodotto sono rappresentati come structs con campi di tipo specifico. Per esempio, se si vuole restituire un array di coppie (intero, stringa) è possibile utilizzare lo schema come questo: l'utilizzo

from pyspark.sql.types import * 

schema = ArrayType(StructType([ 
    StructField("char", StringType(), False), 
    StructField("count", IntegerType(), False) 
])) 

Esempio:

from pyspark.sql.functions import udf 
from collections import Counter 

char_count_udf = udf(
    lambda s: Counter(s).most_common(), 
    schema 
) 

df = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["id", "value"]) 

df.select("*", char_count_udf(df["value"])).show(2, False) 

## +---+-----+-------------------------+ 
## |id |value|PythonUDF#<lambda>(value)| 
## +---+-----+-------------------------+ 
## |1 |foo |[[o,2], [f,1]]   | 
## |2 |bar |[[r,1], [a,1], [b,1]] | 
## +---+-----+-------------------------+ 
+0

La risposta sta funzionando, ma il mio caso è un po 'complesso. I miei dati di ritorno sono del tipo '[('a1', [('b1', 1), ('b2', 2)]), ('a2', [('b1', 1), ('b2 ', 2)])] 'e così creo un tipo come' ArrayType (StructType ([StructField ("date", StringType(), False), ArrayType (StructType ([StructField ("hashId", StringType(), False), StructField ("TimeSpent-Front", FloatType(), False), StructField ("TimeSpent-Back", FloatType(), False)]))])) 'che dà ** L'oggetto 'ArrayType' non ha attributo ' nome '** ... – kamalbanga

+1

'StructType' richiede una sequenza di' StructFields', quindi non puoi usare 'ArrayTypes' da solo. È necessario 'StructField' che memorizza' ArrayType'. Anche un consiglio: se ti ritrovi a creare strutture come questa dovresti probabilmente riconsiderare il modello di dati. Le strutture profondamente annidate sono difficili da gestire senza UDF e le UDF di Python sono tutt'altro che efficienti. – zero323

+0

Come posso semplicemente specificare lo schema in udf per restituire una lista. F.udf (lambda start_date, end_date: [0,1] se start_date pseudocode

4

StackOverflow mi tiene dirigere a questa domanda, così ho Immagino che aggiungerò alcune informazioni qui.

Tornando tipi semplici da UDF:

from pyspark.sql.types import * 
from pyspark.sql import functions as F 

def get_df(): 
    d = [(0.0, 0.0), (0.0, 3.0), (1.0, 6.0), (1.0, 9.0)] 
    df = sqlContext.createDataFrame(d, ['x', 'y']) 
    return df 

df = get_df() 
df.show() 

# +---+---+ 
# | x| y| 
# +---+---+ 
# |0.0|0.0| 
# |0.0|3.0| 
# |1.0|6.0| 
# |1.0|9.0| 
# +---+---+ 

func = udf(lambda x: str(x), StringType()) 
df = df.withColumn('y_str', func('y')) 

func = udf(lambda x: int(x), IntegerType()) 
df = df.withColumn('y_int', func('y')) 

df.show() 

# +---+---+-----+-----+ 
# | x| y|y_str|y_int| 
# +---+---+-----+-----+ 
# |0.0|0.0| 0.0| 0| 
# |0.0|3.0| 3.0| 3| 
# |1.0|6.0| 6.0| 6| 
# |1.0|9.0| 9.0| 9| 
# +---+---+-----+-----+ 

df.printSchema() 

# root 
# |-- x: double (nullable = true) 
# |-- y: double (nullable = true) 
# |-- y_str: string (nullable = true) 
# |-- y_int: integer (nullable = true) 

Quando interi non bastano:

df = get_df() 

func = udf(lambda x: [0]*int(x), ArrayType(IntegerType())) 
df = df.withColumn('list', func('y')) 

func = udf(lambda x: {float(y): str(y) for y in range(int(x))}, 
      MapType(FloatType(), StringType())) 
df = df.withColumn('map', func('y')) 

df.show() 
# +---+---+--------------------+--------------------+ 
# | x| y|    list|     map| 
# +---+---+--------------------+--------------------+ 
# |0.0|0.0|     []|    Map()| 
# |0.0|3.0|   [0, 0, 0]|Map(2.0 -> 2, 0.0...| 
# |1.0|6.0| [0, 0, 0, 0, 0, 0]|Map(0.0 -> 0, 5.0...| 
# |1.0|9.0|[0, 0, 0, 0, 0, 0...|Map(0.0 -> 0, 5.0...| 
# +---+---+--------------------+--------------------+ 

df.printSchema() 
# root 
# |-- x: double (nullable = true) 
# |-- y: double (nullable = true) 
# |-- list: array (nullable = true) 
# | |-- element: integer (containsNull = true) 
# |-- map: map (nullable = true) 
# | |-- key: float 
# | |-- value: string (valueContainsNull = true) 

Tornando tipi di dati complessi da UDF:

df = get_df() 
df = df.groupBy('x').agg(F.collect_list('y').alias('y[]')) 
df.show() 

# +---+----------+ 
# | x|  y[]| 
# +---+----------+ 
# |0.0|[0.0, 3.0]| 
# |1.0|[9.0, 6.0]| 
# +---+----------+ 

schema = StructType([ 
    StructField("min", FloatType(), True), 
    StructField("size", IntegerType(), True), 
    StructField("edges", ArrayType(FloatType()), True), 
    StructField("val_to_index", MapType(FloatType(), IntegerType()), True) 
    # StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)])) 

]) 

def func(values): 
    mn = min(values) 
    size = len(values) 
    lst = sorted(values)[::-1] 
    val_to_index = {x: i for i, x in enumerate(values)} 
    return (mn, size, lst, val_to_index) 

func = udf(func, schema) 
dff = df.select('*', func('y[]').alias('complex_type')) 
dff.show(10, False) 

# +---+----------+------------------------------------------------------+ 
# |x |y[]  |complex_type           | 
# +---+----------+------------------------------------------------------+ 
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]| 
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]| 
# +---+----------+------------------------------------------------------+ 

dff.printSchema() 

# +---+----------+------------------------------------------------------+ 
# |x |y[]  |complex_type           | 
# +---+----------+------------------------------------------------------+ 
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]| 
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]| 
# +---+----------+------------------------------------------------------+ 

passaggio di molteplici argomenti a una funzione definita dall'utente:

df = get_df() 
func = udf(lambda arr: arr[0]*arr[1],FloatType()) 
df = df.withColumn('x*y', func(F.array('x', 'y'))) 

    # +---+---+---+ 
    # | x| y|x*y| 
    # +---+---+---+ 
    # |0.0|0.0|0.0| 
    # |0.0|3.0|0.0| 
    # |1.0|6.0|6.0| 
    # |1.0|9.0|9.0| 
    # +---+---+---+ 

Il codice è puramente a scopo dimostrativo, tutte le trasformazioni di cui sopra sono disponibili in codice Spark e produrranno prestazioni molto migliori. Come @ zero323 nel commento sopra, le UDF dovrebbero generalmente essere evitate in pyspark; restituire tipi complessi dovrebbe farti pensare a semplificare la tua logica.

Problemi correlati