2015-07-13 19 views
8

Ho due RDD. Un RDD è compreso tra 5-10 milioni di voci e l'altro RDD è compreso tra 500 milioni e 750 milioni di voci. Ad un certo punto, devo unire questi due rdds usando una chiave comune.Come posso unire efficientemente un grande rdd ad una grande rdd in spark?

val rddA = someData.rdd.map { x => (x.key, x); } // 10-million 
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million 
var joinRDD = rddA.join(rddB); 

Quando la scintilla decide di farlo, decide di fare uno ShuffledHashJoin. Questo fa sì che molti degli elementi in rddB vengano mischiati sulla rete. Allo stesso modo, alcuni di rddA vengono anche mescolati sulla rete. In questo caso, rddA è troppo "grande" da usare come variabile di trasmissione, ma sembra come un BroadcastHashJoin sarebbe più efficiente. C'è da suggerire di scintillare per usare un BroadcastHashJoin? (Apache Flink supporta questo attraverso unire suggerimenti).

In caso contrario, è l'unica opzione per aumentare autoBroadcastJoinThreshold?

Aggiornamento 7/14

mio problema di prestazioni sembra essere esattamente radicata nel partizionamento. Normalmente, una lettura RDD da HDFS sarebbe partizionata per blocco, ma in questo caso la fonte era una fonte di dati parquet [che ho creato]. Quando spark (databricks) scrive il file parquet, scrive un file per partizione e, identicamente, legge una partizione per file. Quindi, la risposta migliore che ho trovato è che durante la produzione dell'origine dati, per suddividerla per chiave, scrivere il lavello per parquet (che è naturalmente co-partizionato) e usarlo come rddB.

La risposta data è corretta, ma penso che i dettagli sull'origine dati parquet potrebbero essere utili a qualcun altro.

risposta

16

È possibile partizionare RDD con lo stesso divisore, in questo caso le partizioni con la stessa chiave verranno collocate sullo stesso esecutore.

In questo caso si evita la riproduzione casuale delle operazioni di unione.

Shuffle potrà avvenire solo una volta, quando si aggiorna parititoner, e se sei in cache RDD è tutti i join dopo che dovrebbe essere locale per esecutori

import org.apache.spark.SparkContext._ 

class A 
class B 

val rddA: RDD[(String, A)] = ??? 
val rddB: RDD[(String, B)] = ??? 

val partitioner = new HashPartitioner(1000) 

rddA.partitionBy(partitioner).cache() 
rddB.partitionBy(partitioner).cache() 

Inoltre si può provare ad aggiornare dimensione di soglia di trasmissione, forse rddA può essere trasmesso:

--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb 

Utilizziamo 400mb per i join di trasmissione e funziona bene.

+0

Avevo paura che fosse quello che stavi per dire. Ho già provato a utilizzare la partizioneBy - e in pratica devi pagare la penalità in anticipo. Sfortunatamente, "leggo" un file upstream dal RDD e non c'è davvero un buon modo per leggere direttamente in una struttura partizionata, quindi devo partizionare dopo la lettura. Ho giocato con autoBroadcastJoinThreshold, quindi so che funziona, avrei preferito non farlo. Come ho affermato nel PO, questa è un'area in cui Flink offre il controllo che vorrei che Spark facesse. Grazie per la risposta. – Ajaxx

+2

Ho capito che --conf spark.sql.autoBroadcastJoinThreshold si applica solo ai join tra Dataframes o Dataset (Spark SQL). Viene utilizzato anche per i join RDD? Grazie. – leo9r

Problemi correlati