Sto eseguendo un lavoro Spark per aggregare i dati. Ho una struttura dati personalizzata chiamata Profile, che in pratica contiene uno mutable.HashMap[Zone, Double]
. Voglio unire tutti i profili che condividono una data chiave (un UUID), con il seguente codice:Scopo di 'spark.driver.maxResultSize'
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
Curiosamente, Spark non riesce con il seguente errore:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
La soluzione più ovvia è quella di incrementare "spark.driver.maxResultSize", ma due cose mi confondono.
- Troppo di una coincidenza che ricevo 1.024,0 superiore a 1.024,0
- Tutta la documentazione e la guida che ho trovato googling questo particolare parametro di errore e di configurazione indica che influenzano le funzioni che prendono un valore al conducente. (per esempio,
take()
ocollect()
), ma non prendo NULLA al driver, basta leggere da HDFS, aggregare, salvare su HDFS.
Qualcuno sa perché sto ricevendo questo errore?
si potrebbe verificare la mia risposta? – mrsrinivas
Lo sveglierò, ma purtroppo non ho più accesso a quel codice (o azienda), né la tua risposta risolve il pezzo n. 2, ovvero che l'operazione non dovrebbe avvenire in primo luogo: -S –