7

Spark ora offre funzioni predefinite che possono essere utilizzate nei dataframes e sembra che siano altamente ottimizzate. La mia domanda iniziale stava per essere più veloce, ma ho fatto alcuni test e ho trovato che le funzioni spark erano circa 10 volte più veloci almeno in un'istanza. Qualcuno sa perché è così, e quando sarebbe un udf più veloce (solo per le istanze che esiste una funzione scintilla identica)?Funzioni spark vs prestazioni UDF?

Ecco il mio codice di prova (corse su Databricks comunità ndr): la funzione

# UDF vs Spark function 
from faker import Factory 
from pyspark.sql.functions import lit, concat 
fake = Factory.create() 
fake.seed(4321) 

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1) 
from pyspark.sql import Row 
def fake_entry(): 
    name = fake.name().split() 
    return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1) 

# Create a helper function to call a function repeatedly 
def repeat(times, func, *args, **kwargs): 
    for _ in xrange(times): 
     yield func(*args, **kwargs) 
data = list(repeat(500000, fake_entry)) 
print len(data) 
data[0] 

dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age')) 
dataDF.cache() 

UDF:

concat_s = udf(lambda s: s+ 's') 
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name')) 
udfData.count() 

Spark Funzione:

spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name')) 
spfData.count() 

Ran entrambe più volte, il in genere l'udf impiegava circa 1,1 - 1,4 s, e la funzione spark concat richiedeva sempre meno di 0,15 s.

risposta

15

quando sarebbe un'UDF essere più veloce

Se chiedete su Python UDF la risposta è probabilmente mai. Poiché le funzioni SQL sono relativamente semplici e non sono progettate per attività complesse, è praticamente impossibile compensare il costo di ripetute serializzazioni, deserializzazione e spostamento di dati tra l'interprete Python e JVM.

Qualcuno sa perché questo è così

Le ragioni principali sono già descritto sopra può essere ridotto ad un semplice fatto che Spark DataFrame è nativamente una struttura JVM e metodi di accesso standard sono implementati per semplice chiama all'API Java. Le UDF dall'altra parte sono implementate in Python e richiedono lo spostamento dei dati avanti e indietro.

Mentre PySpark in generale richiede movimenti di dati tra JVM e Python, in caso di API RDD di basso livello in genere non richiede costose attività serde. Spark SQL aggiunge costi aggiuntivi di serializzazione e serializzazione, nonché il costo di spostamento dei dati da e verso la rappresentazione non sicura su JVM. La successiva è specifica per tutte le UDF (Python, Scala e Java), ma la prima è specifica per le lingue non native.

A differenza delle UDF, le funzioni di Spark SQL operano direttamente su JVM e sono generalmente ben integrate con Catalyst e Tungsten. Significa che questi possono essere ottimizzati nel piano di esecuzione e la maggior parte del tempo può trarre vantaggio da codgen e da altre ottimizzazioni del tungsteno. Inoltre questi possono operare su dati nella sua rappresentazione "nativa".

Quindi in un certo senso il problema qui è che Python UDF deve portare i dati al codice mentre le espressioni SQL vanno diversamente.

+0

risposta Fantastico, proprio quello che stavo cercando. Sospettavo che fosse dovuto al mischiarsi dei dati tra Python-Java, ma non ne ero sicuro. Apprezzo le informazioni aggiuntive che potrebbero trarre vantaggio da Catalyst e Tungsten, quindi sarà molto più importante per me implementarle quanto più possibile nel mio codice e ridurre al minimo le UDF. Un po 'fuori tema, ma ti capiteresti di sapere se le capacità di Numpy arriveranno su Spark Dataframes in qualunque momento? Ciò ha mantenuto uno dei miei progetti in gran parte su RDD. – alfredox

+0

Non sono sicuro di cosa intendi esattamente per "capacità numpy". – zero323

+0

Non è possibile aggiungere una matrice numpy come elemento di riga. Attualmente le righe di Spark supportano diversi tipi di dati come StringType, BoolType, FloatType, ma non è possibile salvare una matrice numpy lì. – alfredox

0

Dal 30 ottobre 2017, Spark ha appena introdotto i PDF vettorizzati per pyspark.

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

La ragione per cui Python UDF è lento, è probabilmente il PySpark UDF non è implementata in un modo più ottimizzato:

Secondo il paragrafo dal link.

Spark ha aggiunto un'API Python nella versione 0.7, con supporto per funzioni definite dall'utente. Queste funzioni definite dall'utente operano una riga alla volta e pertanto presentano un elevato sovraccarico di serializzazione e invocazione.

Tuttavia i PDF di nuova vectorized sembrano migliorare le prestazioni molto:

che vanno da 3x a più di 100x.

enter image description here