Spark ora offre funzioni predefinite che possono essere utilizzate nei dataframes e sembra che siano altamente ottimizzate. La mia domanda iniziale stava per essere più veloce, ma ho fatto alcuni test e ho trovato che le funzioni spark erano circa 10 volte più veloci almeno in un'istanza. Qualcuno sa perché è così, e quando sarebbe un udf più veloce (solo per le istanze che esiste una funzione scintilla identica)?Funzioni spark vs prestazioni UDF?
Ecco il mio codice di prova (corse su Databricks comunità ndr): la funzione
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
UDF:
concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
Spark Funzione:
spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
Ran entrambe più volte, il in genere l'udf impiegava circa 1,1 - 1,4 s, e la funzione spark concat richiedeva sempre meno di 0,15 s.
risposta Fantastico, proprio quello che stavo cercando. Sospettavo che fosse dovuto al mischiarsi dei dati tra Python-Java, ma non ne ero sicuro. Apprezzo le informazioni aggiuntive che potrebbero trarre vantaggio da Catalyst e Tungsten, quindi sarà molto più importante per me implementarle quanto più possibile nel mio codice e ridurre al minimo le UDF. Un po 'fuori tema, ma ti capiteresti di sapere se le capacità di Numpy arriveranno su Spark Dataframes in qualunque momento? Ciò ha mantenuto uno dei miei progetti in gran parte su RDD. – alfredox
Non sono sicuro di cosa intendi esattamente per "capacità numpy". – zero323
Non è possibile aggiungere una matrice numpy come elemento di riga. Attualmente le righe di Spark supportano diversi tipi di dati come StringType, BoolType, FloatType, ma non è possibile salvare una matrice numpy lì. – alfredox