2014-11-20 12 views
15

Quando si riduce il numero di partizioni si può usare coalesce, che è ottimo perché non causa uno shuffle e sembra funzionare immediatamente (non richiede una fase di lavoro aggiuntiva).Spark: aumentare il numero di partizioni senza causare un shuffle?

Mi piacerebbe fare l'opposto a volte, ma repartition induce un shuffle. Penso che alcuni mesi fa ho effettivamente funzionato usando CoalescedRDD con balanceSlack = 1.0 - quindi cosa succederebbe sarebbe dividere una partizione in modo che la partizione risultante posizione in cui tutti sullo stesso nodo (così piccolo IO rete).

Questo tipo di funzionalità è automatico in Hadoop, basta modificare le dimensioni divise. Non sembra funzionare in questo modo in Spark a meno che non si stia riducendo il numero di partizioni. Penso che la soluzione potrebbe essere quella di scrivere un partizionatore personalizzato insieme a un RDD personalizzato in cui definiamo getPreferredLocations ... ma ho pensato che è una cosa così semplice e comune da fare sicuramente ci deve essere un modo semplice per farlo?

Cose cercato:

.set("spark.default.parallelism", partitions) sul mio SparkConf, e quando nel contesto della lettura parquet ho provato sqlContext.sql("set spark.sql.shuffle.partitions= ..., che sulla 1.0.0 provoca un errore e non vuole veramente che voglio, voglio partizione numero da cambiare tra tutti i tipi di lavoro, non solo shuffle.

+0

Qualche fortuna di trovare una soluzione per questo? – nbubis

risposta

0

Non capisco esattamente qual è il tuo punto. Vuoi dire che hai ora 5 partizioni, ma dopo l'operazione successiva vuoi i dati distribuiti a 10? Perché avere 10, ma usare ancora 5 non ha molto senso ... Il processo di invio dei dati a nuove partizioni deve avvenire qualche volta.

Quando si esegue coalesce, è possibile eliminare partizioni non desiderate, ad esempio: se inizialmente si era 100, ma dopo aver ridottoByKey si ottiene 10 (come là dove solo 10 chiavi), è possibile impostare coalesce.

Se si desidera che il processo di andare nella direzione opposta, si può solo imporre un qualche tipo di partizionamento:

[RDD].partitionBy(new HashPartitioner(100)) 

io non sono sicuro che è quello che stai cercando, ma spero così.

+3

Ogni partizione ha una posizione, ovvero un nodo, supponiamo di avere 5 partizioni e 5 nodi. Se chiamo 'ripartizione', o il tuo codice, a 10 partizioni, questo rimescola i dati - cioè i dati per ciascuno dei 5 nodi possono passare sulla rete su altri nodi. Quello che voglio è che Spark divida semplicemente ogni partizione in 2 senza spostare alcun dato - questo è ciò che accade in Hadoop quando si modificano le impostazioni divise. – samthebest

+0

Non sono sicuro che tu possa farlo. Immagino che avresti bisogno di una sorta di funzione '.forEachNode'. Ma non ho mai visto nulla di simile. E non sono sicuro che possa essere facilmente implementato. Il partizionatore deve restituire la stessa partizione per lo stesso oggetto ogni volta. Per impostazione predefinita, Spark usa 'HashPartitioner', che ** fahCode modulo number_of_partitions **. Se si suddividessero i dati in due nuove partizioni, finirebbero per non finire nelle loro posizioni. Ecco perché lo shuffle è necessario. Forse se hai il tuo partizionatore, potrebbe aumentare il numero di partizioni senza mischiare su rete. – szefuf

Problemi correlati