2013-07-12 8 views
45

ho due RDD di che voglio aderire e hanno questo aspetto:Spark: qual è la migliore strategia per unire un RDD a due chiavi a due chiavi con RDD a chiave singola?

val rdd1:RDD[(T,U)] 
val rdd2:RDD[((T,W), V)] 

Capita di essere il caso che i valori chiave di rdd1 sono unici e anche che i valori tupla-chiave di rdd2 sono unici . Mi piacerebbe unire i due insiemi di dati in modo che ottengo il seguente RDD:

val rdd_joined:RDD[((T,W), (U,V))] 

Qual è il modo più efficace per raggiungere questo obiettivo? Ecco alcune idee a cui ho pensato.

Opzione 1:

val m = rdd1.collectAsMap 
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))}) 

Opzione 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct 
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2) 

Opzione 1 raccoglierà tutti i dati da padroneggiare, giusto? Quindi non sembra una buona opzione se rdd1 è grande (nel mio caso è relativamente grande, sebbene un ordine di grandezza inferiore a rdd2). L'opzione 2 fa un brutto prodotto distinto e cartesiano, che sembra anche molto inefficiente. Un'altra possibilità che mi è venuta in mente (ma che non ho ancora provato) è quella di fare l'opzione 1 e trasmettere la mappa, anche se sarebbe meglio trasmettere in modo "intelligente" in modo che le chiavi della mappa siano co-localizzate con il chiavi di rdd2.

Qualcuno ha mai incontrato questo tipo di situazione? Sarei felice di avere i tuoi pensieri.

Grazie!

risposta

56

Un'opzione consiste nell'eseguire un collegamento di trasmissione raccogliendo rdd1 nel driver e trasmettendolo a tutti i mappatori; fatto correttamente, questo ci permetterà di evitare una costosa riordino del grande rdd2 RDD:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) 
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) 

val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) 
val joined = rdd2.mapPartitions({ iter => 
    val m = rdd1Broadcast.value 
    for { 
    ((t, w), u) <- iter 
    if m.contains(t) 
    } yield ((t, w), (u, m.get(t).get)) 
}, preservesPartitioning = true) 

Il preservesPartitioning = true dice scintilla che questa funzione mappa non modifica le chiavi del rdd2; ciò consentirà a Spark di evitare la ripartizione dello rdd2 per le operazioni successive che si uniscono in base alla chiave (t, w).

Questa trasmissione potrebbe essere inefficiente poiché implica un collo di bottiglia per le comunicazioni al conducente. In linea di principio, è possibile trasmettere un RDD a un altro senza coinvolgere il conducente; Ho un prototipo di questo che vorrei generalizzare e aggiungere a Spark.

Un'altra opzione è quella di ri-mappare le chiavi di rdd2 e utilizzare il metodo Spark join; questo comporterà un riordino completo di rdd2 (e possibilmente rdd1):

rdd1.join(rdd2.map { 
    case ((t, w), u) => (t, (w, u)) 
}).map { 
    case (t, (v, (w, u))) => ((t, w), (u, v)) 
}.collect() 

Sul mio input di esempio, entrambi i metodi producono lo stesso risultato:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C))) 

Una terza opzione sarebbe quella di ristrutturare rdd2 in modo che t sia la sua chiave, quindi eseguire il join sopra.

+1

Penso che la seconda opzione sia probabilmente la via più facile da percorrere, anche se la ristrutturazione di rdd2 sarebbe conveniente. – Noah

+0

Dovrò imparare di più sul funzionamento della funzione mapPartitions, ma sembra proprio quello che stavo cercando. Sono anche d'accordo che potrei ri-strutturare 'rdd2' e attraverso una serie di mappe tornare alla cosa originale che volevo. Analizzerò entrambe le opzioni e vedremo come si comportano bene per il mio caso d'uso. Grazie per i suggerimenti! – RyanH

+0

Per la prima opzione, quando provo val rdd1Broadcast = sc.broadcast (rdd1.collectAsMap()) restituisce dati incompleti. C'è un modo per regolare la prima opzione usando collect() invece di collecAsMap()? –

12

Un altro modo per farlo è creare un partizionatore personalizzato e quindi usare zipPartitions per unire i propri RDD.

import org.apache.spark.HashPartitioner 

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { 

    override def getPartition(key: Any): Int = key match { 
    case k: Tuple2[Int, String] => super.getPartition(k._1) 
    case _ => super.getPartition(key) 
    } 

} 

val numSplits = 8 
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) 
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) 

val result = rdd2.zipPartitions(rdd1)(
    (iter2, iter1) => { 
    val m = iter1.toMap 
    for { 
     ((t: Int, w), u) <- iter2 
     if m.contains(t) 
     } yield ((t, w), (u, m.get(t).get)) 
    } 
).partitionBy(new HashPartitioner(numSplits)) 

result.glom.collect 
Problemi correlati