2015-06-26 11 views
11

I documenti API Apache Spark pyspark.RDD menzionano che groupByKey() è inefficiente. Invece, si consiglia di utilizzare reduceByKey(), aggregateByKey(), combineByKey() o foldByKey(). Ciò comporterà una parte dell'aggregazione nei lavoratori prima del rimescolamento, riducendo così il mescolamento dei dati tra i lavoratori.Apache Spark: Qual è l'implementazione equivalente di RDD.groupByKey() utilizzando RDD.aggregateByKey()?

Dato il seguente set di dati e l'espressione groupByKey(), che cosa è un'implementazione equivalente ed efficiente (riduzione dei dati intercambiabili) che non utilizza groupByKey(), ma fornisce lo stesso risultato?

dataset = [("a", 7), ("b", 3), ("a", 8)] 
rdd = (sc.parallelize(dataset) 
     .groupByKey()) 
print sorted(rdd.mapValues(list).collect()) 

uscita:

[('a', [7, 8]), ('b', [3])] 
+0

I dati sono suddivisi in modo casuale o per chiave? Se puoi assicurarti che tutti i record con a._1 = "a" si trovino sulla stessa partizione, puoi accelerare drasticamente le cose - potresti essere in grado di cavartela senza bisogno di shuffle, oltre a quelli necessari per il partizionamento iniziale . Forse provare a utilizzare un partizionatore hash? –

risposta

18

quanto riguarda Posso dire che non c'è niente da guadagnare * in questo caso particolare usando aggregateByKey o una funzione simile. Dato che stai costruendo un elenco non c'è una riduzione "reale" e la quantità di dati che deve essere mescolata è più o meno la stessa.

Per osservare davvero alcuni guadagni di prestazioni sono necessarie trasformazioni che riducono effettivamente la quantità di dati trasferiti per esempio il conteggio, il calcolo delle statistiche di riepilogo, la ricerca di elementi unici.

Per quanto riguarda i vantaggi delle differenze dell'utilizzo di reduceByKey(), combineByKey() o foldByKey(), c'è un'importante differenza concettuale che è più facile da vedere quando si considerano le API di Scala.

Entrambi reduceByKey e foldByKey mappa da RDD[(K, V)] a RDD[(K, V)] mentre il secondo fornisce un ulteriore elemento zero.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)] 

combineByKey (non c'è aggregateByKey, ma è lo stesso tipo di trasformazione) trasforma RDD[(K, V)]-RDD[(K, C)]:

combineByKey[C](
    createCombiner: (V) ⇒ C, 
    mergeValue: (C, V) ⇒ C, 
    mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Tornando al tuo esempio solo combineByKey (e in PySpark aggregateByKey) è realmente applicabile poiché si sta trasformando da RDD[(String, Int)] a RDD[(String, List[Int])].

Mentre in un linguaggio dinamico come Python in realtà è possibile eseguire tale operazione utilizzando foldByKey o reduceByKey rende la semantica del codice poco chiaro e per citare @ tim-Peters "Ci dovrebbe essere tra-- e, preferibilmente, una sola - Modo ovvio per farlo "[1].

Differenza tra aggregateByKey e combineByKey è praticamente la stessa che c'è tra reduceByKey e foldByKey così per una lista è soprattutto una questione di gusto:

def merge_value(acc, x): 
    acc.append(x) 
    return acc 

def merge_combiners(acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
    .combineByKey(
     lambda x: [x], 
     lambda u, v: u + [v], 
     lambda u1,u2: u1+u2)) 

In pratica si dovrebbe preferire groupByKey però. L'implementazione di PySpark è notevolmente più ottimizzata rispetto all'implementazione ingenua come quella sopra descritta.

1.Peters, T. PEP 20 - Lo Zen di Python. (2004). a https://www.python.org/dev/peps/pep-0020/


* In pratica v'è in realtà un bel po 'da perdere qui, soprattutto quando si utilizza PySpark. L'implementazione Python di groupByKey è notevolmente più ottimizzata rispetto alla combinazione ingenuo per chiave. È possibile controllare Be Smart About groupByKey, creato da me e @eliasah per una discussione aggiuntiva.

+0

Se si utilizza un partizionatore (ad esempio, una partizione con un hash della chiave), si può andare via senza bisogno di altri shuffles? –

+0

@GlennStrycker Per quanto ne so, la risposta è positiva. Se RDD è partizionato per chiave, tutti i valori per una determinata chiave dovrebbero essere elaborati localmente su un singolo nodo. Il problema possibile è però una distribuzione distorta delle chiavi. – zero323

3

Qui è un'opzione che utilizza aggregateByKey(). Sarei curioso di sapere come farlo utilizzando reduceByKey(), combineByKey() o foldByKey(), e quale costo/beneficio c'è per ogni alternativa.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: u+[v], 
         lambda u1,u2: u1+u2)) 
print sorted(rdd.mapValues(list).collect()) 

uscita:

[('a', [7, 8]), ('b', [3])] 

La seguente è una memoria attuazione efficiente leggermente più, anche se meno leggibile al novizio pitone, che produce lo stesso risultato:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: itertools.chain(u,[v]), 
         lambda u1,u2: itertools.chain(u1,u2))) 
print sorted(rdd.mapValues(list).collect())