ho due RDD di che voglio aderire e hanno questo aspetto:Spark: qual è la migliore strategia per unire un RDD a due chiavi a due chiavi con RDD a chiave singola?
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Capita di essere il caso che i valori chiave di rdd1
sono unici e anche che i valori tupla-chiave di rdd2
sono unici . Mi piacerebbe unire i due insiemi di dati in modo che ottengo il seguente RDD:
val rdd_joined:RDD[((T,W), (U,V))]
Qual è il modo più efficace per raggiungere questo obiettivo? Ecco alcune idee a cui ho pensato.
Opzione 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Opzione 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Opzione 1 raccoglierà tutti i dati da padroneggiare, giusto? Quindi non sembra una buona opzione se rdd1 è grande (nel mio caso è relativamente grande, sebbene un ordine di grandezza inferiore a rdd2). L'opzione 2 fa un brutto prodotto distinto e cartesiano, che sembra anche molto inefficiente. Un'altra possibilità che mi è venuta in mente (ma che non ho ancora provato) è quella di fare l'opzione 1 e trasmettere la mappa, anche se sarebbe meglio trasmettere in modo "intelligente" in modo che le chiavi della mappa siano co-localizzate con il chiavi di rdd2
.
Qualcuno ha mai incontrato questo tipo di situazione? Sarei felice di avere i tuoi pensieri.
Grazie!
Penso che la seconda opzione sia probabilmente la via più facile da percorrere, anche se la ristrutturazione di rdd2 sarebbe conveniente. – Noah
Dovrò imparare di più sul funzionamento della funzione mapPartitions, ma sembra proprio quello che stavo cercando. Sono anche d'accordo che potrei ri-strutturare 'rdd2' e attraverso una serie di mappe tornare alla cosa originale che volevo. Analizzerò entrambe le opzioni e vedremo come si comportano bene per il mio caso d'uso. Grazie per i suggerimenti! – RyanH
Per la prima opzione, quando provo val rdd1Broadcast = sc.broadcast (rdd1.collectAsMap()) restituisce dati incompleti. C'è un modo per regolare la prima opzione usando collect() invece di collecAsMap()? –