2015-05-19 4 views
10

Si consideri il seguente esempio di esecuzione di un GROUP BY con un numero relativamente elevato di aggregazioni e di un numero relativamente elevato di gruppi:Funzionare Performance & Memoria Problemi con GRUPPO scintilla SQL

import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.SparkContext._ 
val h = new HiveContext(sc) 
import h.implicits._ 

val num_columns = 3e3.toInt 
val num_rows = 1e6.toInt 
val num_groups = 1e5.toInt 

case class Data(A: Long = (math.random*num_groups).toLong) 

val table = (1 to num_rows).map(i => Data()).toDF 

val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i") 
table.registerTempTable("table") 
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a") 

// Write the result to make sure everyting is executed 
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet") 

L'ingresso di questo lavoro è solo 8MB, l'output intorno a 2,4 GB e sto eseguendo questo su un cluster con tre macchine worker con una memoria da 61 GB ciascuna. Il risultato: tutti i lavoratori si bloccano con le eccezioni OutOfMemory. Anche con valori inferiori per num_columns il lavoro diventa irragionevolmente lento a causa dell'overhead del GC.

preferenze abbiamo provato includono:

  • riducendo la dimensione della partizione (riduce l'occupazione di memoria ma aumenta l'overhead contabilità)
  • pre-partizionamento dei dati con una HashPartitioner prima di fare l'aggregazione (riduce il consumo di memoria ma richiede un completo rimpasto prima che avvenga un vero lavoro)

Ci sono modi migliori per ottenere l'effetto desiderato?

+0

Non una risposta, ma gli stack da 61 GB su ciascun worker sono molto grandi e causeranno pause GC molto lunghe. Sarebbe meglio avere più lavoratori più piccoli o dividere ogni server per eseguire più executor con meno heap per ridurre l'impatto di GC. – Rich

+0

Con "riduzione della dimensione della partizione" ci si riferisce alla regolazione di 'spark.sql.shuffle.partitions'? Altrimenti potresti provare ad aumentare quel numero, il valore predefinito è 200. – Rich

+0

@Rich, infatti, questo è il parametro che ho usato. Risolve i problemi di memoria, ma spesso mi vedo costretto ad aumentarlo a valori irragionevolmente alti che influenzano nuovamente le prestazioni a causa del sovraccarico contabile. – DanielM

risposta

2

In generale la soluzione quasi universale ai problemi come questo è quello di mantenere le dimensioni della partizione in dimensioni ragionevoli. Mentre "ragionevole" è leggermente soggettivo e può variare da caso a caso 100-200 MB sembra un buon punto di partenza.

Posso facilmente aggregare dati di esempio forniti su un singolo operatore mantenendo il valore predefinito spark.executor.memory (1 GB) e limitando le risorse totali disponibili a 8 core e 8 GB di RAM. Tutto ciò utilizzando 50 partizioni e mantenendo il tempo di aggregazione di circa 3 secondi senza alcun tunning speciale (questo è più o meno coerente tra 1.5.2 e 2.0.0).

Quindi, per riassumere: aumentare spark.default.parallelism o impostare esplicitamente il numero di partizioni quando si crea DataFrame se possibile. L'impostazione predefinita spark.sql.shuffle.partitions dovrebbe essere sufficiente per un set di dati di piccole dimensioni come questo.

-1

Dato che non sono sicuro del tipo di funzione di aggregazione che si sta utilizzando, è difficile dire cosa fa la scintilla in background. In ogni caso, per avere più controllo per ogni funzione di aggregazione, eseguirò una trasformazione di riduzioneByKey per ognuno sul RDD di base stesso. Quindi, è possibile unire facilmente i risultati, se necessario. in questo modo hai più controllo e puoi vedere quale delle aggregazioni ti ha "costato" di più, inoltre puoi evitare il gruppo per operazione che, oltre allo shuffling, può causare anche problemi di memoria (a causa del movimento di interi set di dati in una singola partizione). di seguito è riportata una breve illustrazione, dove aggrigationFunctions è un elenco delle funzioni di aggregazione con il loro ID e la funzione effettiva (un elenco di tuple).

val aggrigationResults = aggrigationFunctions.map( 
    f => { 
    val aggRes = baseRdd 
        .map(x => (x.[the field to group by], x.[the value to aggrigate])) 
        .reduceByKey(f.func) 
    (f.id, aggRes) 
    } 
) 
+1

Stai mescolando il buon vecchio 'groupByKey' (quello di _Avoid ..._) con le operazioni' Dataset'. Queste cose non sono le stesse. In altre parole 'Dataset.groupBy'! =' RDD.groupBy'. – zero323