2015-03-05 5 views
17

Ho appena creato l'elenco python di range(1,100000).Spark con python: come risolvere Stage x contiene un'attività di dimensioni molto grandi (xxx KB). La dimensione massima dell'attività consigliata è 100 KB

Uso SparkContext fatto le seguenti operazioni:

a = sc.parallelize([i for i in range(1, 100000)]) 
b = sc.parallelize([i for i in range(1, 100000)]) 

c = a.zip(b) 

>>> [(1, 1), (2, 2), -----] 

sum = sc.accumulator(0) 

c.foreach(lambda (x, y): life.add((y-x))) 

che dà avvertimento come segue:

ARN TaskSetManager: Fase 3 contiene un compito di dimensioni molto grandi (4644 KB). La dimensione massima dell'attività consigliata è 100 KB.

Come risolvere questo avviso? C'è un modo per gestire le dimensioni? E inoltre, influenzerà la complessità temporale dei big data?

+1

Prima di tutto cos'è esattamente la "vita"? Intendevi accumulatore 'somma'? In ogni caso, non dovrebbe essere un problema qui. Vedi [questo] (http://mail-archives.us.apache.org/mod_mbox/spark-user/201407.mbox/%[email protected].com%3E) per i dettagli. – zero323

risposta

2

Espansione @ leo9r commento: considera l'utilizzo non di un python range, ma sc.rangehttps://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext.range.

Così si evita il trasferimento di una lista enorme dal proprio autista agli esecutori.

Ovviamente tali RDD vengono solitamente utilizzati solo a scopo di test, quindi non si desidera che vengano trasmessi.

+0

usando 'sc.range' invece di' range' funziona nell'esempio del giocattolo, ma manca il più generale problema (come vengono comunicati i dati tra python e java) – Jealie

7

Spark spedisce in modo nativo una copia di ciascuna variabile durante la spedizione dell'attività. Per le grandi entità di queste variabili si consiglia di utilizzare Broadcast Variables

Se si riscontrano ancora problemi di dimensioni, allora dovrebbero essere forse questo i dati di un RDD in sé

edit: Aggiornato il link

+0

Ciao @Hitesh Dharamdasani, C'è un modo per cambiare i file di configurazione SPARK per massimizzare la dimensione del compito.? – sara

+0

Scusa se sono in ritardo. https://spark.apache.org/docs/1.2.0/tuning.html ha alcune raccomandazioni nella sezione "Serializzazione dei dati". ma generalmente non è raccomandato. Ho avuto un discreto successo con il serializzatore kyro ma non di cui vantarsi. Le variabili di trasmissione sono migliori –

+0

cosa fare se la variabile non è serializzabile (e quindi dovrai usare operatori come 'mapPartitions')? Non penso che possa essere racchiuso all'interno di una variabile di trasmissione. – bachr

1

L'idea generale è che PySpark crei tutti i processi Java di quanti siano gli esecutori e quindi invia i dati a ciascun processo. Se ci sono troppi processi, si verifica un collo di bottiglia nella memoria nello spazio dell'heap java.

Nel tuo caso, l'errore specifico è che la RDD che si è creato con sc.parallelize([...]) non ha specificato il numero di partizioni (argomento numSlices, vedere la docs). E l'RDD imposta automaticamente un numero di partizioni troppo piccolo (probabilmente è costituito da una singola partizione).

Per risolvere questo problema, è sufficiente specificare il numero di partizioni voleva:

a = sc.parallelize([...], numSlices=1000) # and likewise for b 

Come si specifica il numero sempre più alto di fette, si vedrà una diminuzione della dimensione indicato nel messaggio di avviso. Aumentare il numero di fette fino a quando non si ottiene più alcun messaggio di avviso. Ad esempio, ottenere

Stage 0 contains a task of very large size (696 KB). The maximum recommended task size is 100 KB 

significa che è necessario specificare più sezioni.


Un altro suggerimento che può essere utile quando si tratta di problemi di memoria (ma questo non è correlato al messaggio di avviso): per impostazione predefinita, la memoria disponibile per ogni esecutore è di 1 GB o giù di lì. È possibile specificare importi maggiori tramite la riga di comando, ad esempio con --executor-memory 64G.

Problemi correlati