Ho due RDD. Un RDD è compreso tra 5-10 milioni di voci e l'altro RDD è compreso tra 500 milioni e 750 milioni di voci. Ad un certo punto, devo unire questi due rdds usando una chiave comune.Come posso unire efficientemente un grande rdd ad una grande rdd in spark?
val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);
Quando la scintilla decide di farlo, decide di fare uno ShuffledHashJoin. Questo fa sì che molti degli elementi in rddB vengano mischiati sulla rete. Allo stesso modo, alcuni di rddA vengono anche mescolati sulla rete. In questo caso, rddA è troppo "grande" da usare come variabile di trasmissione, ma sembra come un BroadcastHashJoin sarebbe più efficiente. C'è da suggerire di scintillare per usare un BroadcastHashJoin? (Apache Flink supporta questo attraverso unire suggerimenti).
In caso contrario, è l'unica opzione per aumentare autoBroadcastJoinThreshold?
Aggiornamento 7/14
mio problema di prestazioni sembra essere esattamente radicata nel partizionamento. Normalmente, una lettura RDD da HDFS sarebbe partizionata per blocco, ma in questo caso la fonte era una fonte di dati parquet [che ho creato]. Quando spark (databricks) scrive il file parquet, scrive un file per partizione e, identicamente, legge una partizione per file. Quindi, la risposta migliore che ho trovato è che durante la produzione dell'origine dati, per suddividerla per chiave, scrivere il lavello per parquet (che è naturalmente co-partizionato) e usarlo come rddB.
La risposta data è corretta, ma penso che i dettagli sull'origine dati parquet potrebbero essere utili a qualcun altro.
Avevo paura che fosse quello che stavi per dire. Ho già provato a utilizzare la partizioneBy - e in pratica devi pagare la penalità in anticipo. Sfortunatamente, "leggo" un file upstream dal RDD e non c'è davvero un buon modo per leggere direttamente in una struttura partizionata, quindi devo partizionare dopo la lettura. Ho giocato con autoBroadcastJoinThreshold, quindi so che funziona, avrei preferito non farlo. Come ho affermato nel PO, questa è un'area in cui Flink offre il controllo che vorrei che Spark facesse. Grazie per la risposta. – Ajaxx
Ho capito che --conf spark.sql.autoBroadcastJoinThreshold si applica solo ai join tra Dataframes o Dataset (Spark SQL). Viene utilizzato anche per i join RDD? Grazie. – leo9r