2015-11-20 9 views
14

Sto lavorando a questi due concetti proprio ora e vorrei una certa chiarezza. Dal lavoro attraverso la riga di comando, ho cercato di identificare le differenze e quando uno sviluppatore avrebbe usato la ripartizione vs partitionBy.Pyspark: ripartizione vs partizioneBy

Ecco alcuni esempi di codice:

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)]) 
rdd1 = rdd.repartition(4) 
rdd2 = rdd.partitionBy(4) 

rdd1.glom().collect() 
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]] 

rdd2.glom().collect() 
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]] 

ho preso uno sguardo alla realizzazione di entrambi, e l'unica differenza che ho notato per la maggior parte è che partitionBy può assumere una funzione di partizionamento, o utilizzando il portable_hash per impostazione predefinita. Quindi in partitionBy, tutte le stesse chiavi dovrebbero essere nella stessa partizione. Nella ripartizione, mi aspetto che i valori siano distribuiti in modo più uniforme sulle partizioni, ma questo non è il caso.

Dato questo, perché qualcuno dovrebbe mai utilizzare la ripartizione? Suppongo che l'unica volta in cui potrei vederlo essere usato è se non sto lavorando con PairRDD, o ho una grande inclinazione dei dati?

C'è qualcosa che mi manca, o qualcuno potrebbe far luce da un angolo diverso per me?

risposta

7

repartition esiste già in RDD e non gestisce il partizionamento tramite chiave (o con qualsiasi altro criterio tranne Ordinamento). Ora PairRDD aggiunge la nozione di chiavi e successivamente aggiunge un altro metodo che consente di partizionare con quella chiave.

Quindi sì, se i tuoi dati sono codificati, dovresti assolutamente partizionare con quella chiave, che in molti casi è il punto di usare un PairRDD in primo luogo (per join, reduceByKey e così via).

+1

Qual è la ragione per cui ripartizione non distribuisce gli elementi in modo uniforme ish attraverso le partizioni? Potrebbe essere un caso in cui non ho abbastanza dati e stiamo riscontrando un piccolo problema di dimensioni del campione? –

+0

Buona domanda, sto vedendo una distribuzione uniforme durante il tentativo (in Scala). –

+0

@JoeWiden Nient'altro che una semplice probabilità. 'ripartizione' sta in realtà utilizzando la coppia RDD internamente aggiungendo la chiave casuale ai valori esistenti in modo da non fornire forti garanzie sulla distribuzione dei dati di output. BTW Probabilmente dovresti accettare la risposta. – zero323

6

ripartizione() viene utilizzato per specificare il numero di partizioni considerando il numero di core e la quantità di dati che si hanno.

partitionBy() è utilizzato per rendere più efficienti le funzioni di shuffling, come ad esempio reduceByKey(), join(), cogroup() ecc. È utile solo nei casi in cui un RDD viene utilizzato per più volte, quindi è di solito seguito da persist().

differenze tra i due in azione:

pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x)) 

pairs.partitionBy(3).glom().collect() 
[[(3, 3), (6, 6), (6, 6)], 
[(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)], 
[(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]] 

pairs.repartition(3).glom().collect() 
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)], 
[(1, 1), (4, 4), (6, 6), (4, 4)], 
[(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]