2015-09-11 14 views
7

Sto eseguendo un lavoro Spark per aggregare i dati. Ho una struttura dati personalizzata chiamata Profile, che in pratica contiene uno mutable.HashMap[Zone, Double]. Voglio unire tutti i profili che condividono una data chiave (un UUID), con il seguente codice:Scopo di 'spark.driver.maxResultSize'

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1} 
val aggregated = dailyProfiles 
    .aggregateByKey(new Profile(), 3200)(merge, merge).cache() 

Curiosamente, Spark non riesce con il seguente errore:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

La soluzione più ovvia è quella di incrementare "spark.driver.maxResultSize", ma due cose mi confondono.

  1. Troppo di una coincidenza che ricevo 1.024,0 superiore a 1.024,0
  2. Tutta la documentazione e la guida che ho trovato googling questo particolare parametro di errore e di configurazione indica che influenzano le funzioni che prendono un valore al conducente. (per esempio, take() o collect()), ma non prendo NULLA al driver, basta leggere da HDFS, aggregare, salvare su HDFS.

Qualcuno sa perché sto ricevendo questo errore?

+0

si potrebbe verificare la mia risposta? – mrsrinivas

+0

Lo sveglierò, ma purtroppo non ho più accesso a quel codice (o azienda), né la tua risposta risolve il pezzo n. 2, ovvero che l'operazione non dovrebbe avvenire in primo luogo: -S –

risposta

1

Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

That serialized output must be more than 1024.0 MB and less than 1024.1 MB.

Controllare aggiunto snippet di codice Apache Spark, È molto interessante e molto raro trovare questo errore. :)

Qui totalResultSize > maxResultSize entrambi sono di tipo Long e contengono il valore in byte. Ma msg contiene il valore arrotondato da Utils.bytesToString().

//TaskSetManager.scala 
    def canFetchMoreResults(size: Long): Boolean = sched.synchronized { 
    totalResultSize += size 
    calculatedTasks += 1 
    if (maxResultSize > 0 && totalResultSize > maxResultSize) { 
     val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + 
     s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + 
     s"(${Utils.bytesToString(maxResultSize)})" 
     logError(msg) 
     abort(msg) 
     false 
    } else { 
     true 
    } 
    } 

Apache Spark 1.3 - source


//Utils.scala 
    def bytesToString(size: Long): String = { 
    val TB = 1L << 40 
    val GB = 1L << 30 
    val MB = 1L << 20 
    val KB = 1L << 10 

    val (value, unit) = { 
     if (size >= 2*TB) { 
     (size.asInstanceOf[Double]/TB, "TB") 
     } else if (size >= 2*GB) { 
     (size.asInstanceOf[Double]/GB, "GB") 
     } else if (size >= 2*MB) { 
     (size.asInstanceOf[Double]/MB, "MB") 
     } else if (size >= 2*KB) { 
     (size.asInstanceOf[Double]/KB, "KB") 
     } else { 
     (size.asInstanceOf[Double], "B") 
     } 
    } 
    "%.1f %s".formatLocal(Locale.US, value, unit) 
    } 

Apache Spark 1.3 - source