Si consideri il seguente esempio di esecuzione di un GROUP BY
con un numero relativamente elevato di aggregazioni e di un numero relativamente elevato di gruppi:Funzionare Performance & Memoria Problemi con GRUPPO scintilla SQL
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
L'ingresso di questo lavoro è solo 8MB, l'output intorno a 2,4 GB e sto eseguendo questo su un cluster con tre macchine worker con una memoria da 61 GB ciascuna. Il risultato: tutti i lavoratori si bloccano con le eccezioni OutOfMemory. Anche con valori inferiori per num_columns
il lavoro diventa irragionevolmente lento a causa dell'overhead del GC.
preferenze abbiamo provato includono:
- riducendo la dimensione della partizione (riduce l'occupazione di memoria ma aumenta l'overhead contabilità)
- pre-partizionamento dei dati con una HashPartitioner prima di fare l'aggregazione (riduce il consumo di memoria ma richiede un completo rimpasto prima che avvenga un vero lavoro)
Ci sono modi migliori per ottenere l'effetto desiderato?
Non una risposta, ma gli stack da 61 GB su ciascun worker sono molto grandi e causeranno pause GC molto lunghe. Sarebbe meglio avere più lavoratori più piccoli o dividere ogni server per eseguire più executor con meno heap per ridurre l'impatto di GC. – Rich
Con "riduzione della dimensione della partizione" ci si riferisce alla regolazione di 'spark.sql.shuffle.partitions'? Altrimenti potresti provare ad aumentare quel numero, il valore predefinito è 200. – Rich
@Rich, infatti, questo è il parametro che ho usato. Risolve i problemi di memoria, ma spesso mi vedo costretto ad aumentarlo a valori irragionevolmente alti che influenzano nuovamente le prestazioni a causa del sovraccarico contabile. – DanielM