2015-01-23 16 views
6

Ho cercato una soluzione per un lungo periodo ma non ho ottenuto alcun algoritmo corretto.Come trasformare RDD [(Chiave, Valore)] in Mappa [Tasto, RDD [Valore]]

Utilizzando RDD Spark in scala, come è possibile trasformare uno RDD[(Key, Value)] in un Map[key, RDD[Value]], sapendo che non è possibile utilizzare collect o altri metodi che potrebbero caricare dati in memoria?

In realtà, il mio obiettivo finale è quello di ciclo sul Map[Key, RDD[Value]] a chiave e chiamare saveAsNewAPIHadoopFile per ogni RDD[Value]

Per esempio, se ottengo:

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] 

mi piacerebbe:

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])] 

Mi chiedo se non costerebbe troppo farlo usando filter su ogni tasto A, B, C di RDD[(Key, Value)], ma non so se chiamare il filtro tante volte ci sono chiavi diverse sarebbe efficiente? (Off Certo che no, ma forse usando cache?)

Grazie

+2

"sapendo che non è possibile utilizzare collect o altri metodi che potrebbero caricare dati in memoria?". Questo non ha senso. La mappa risultante dovrà comunque essere inserita nella memoria. –

+0

Solo una pugnalata selvaggia nel buio; non raggruppareBy (...) darti qualcosa che puoi usare? Dovrebbe dare il tuo RDD [tasto, Iterable [valori]] – thoredge

+0

@thoredge Non sono sicuro che un iterabile debba essere in memoria per una grande quantità di dati, ma in effetti secondo il mio volume di input questa potrebbe essere una soluzione – Seb

risposta

2

È consigliabile utilizzare il codice come questo (Python):

rdd = sc.parallelize([("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]).cache() 
keys = rdd.keys().distinct().collect() 
for key in keys: 
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y) 
    out.saveAsNewAPIHadoopFile (...) 

Uno RDD non può essere una parte di un altro RDD e si non hanno alcuna opzione per raccogliere solo le chiavi e trasformare i loro valori correlati in un RDD separato. Nel mio esempio si eseguirà l'iterazione sul RDD memorizzato nella cache che è ok e funzionerebbe rapidamente

+0

Non sono sicuro dell'efficienza del filtro, ma penso che questa sia la soluzione che implementerò. – Seb

+0

Non c'è una trasformazione pronta per la tua logica, temo che se vuoi qualcosa di più efficiente devi implementarlo da solo – 0x0FFF

+0

Questa è fondamentalmente una soluzione non ottimale. Puoi soddisfare il suo obiettivo finale di scrivere su un file separato per chiave in un passaggio con un MultipleTextOutput. –

-1

Questo è il mio semplice codice di prova.

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) 
val groupby_RDD = test_RDD.groupByKey() 
val result_RDD = groupby_RDD.map{v => 
    var result_list:List[Int] = Nil 
    for (i <- v._2) { 
     result_list ::= i 
    } 
    (v._1, result_list) 
} 

Il risultato è al di sotto

result_RDD.take(3) 
>> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6))) 

Oppure si può fare in questo modo

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) 
val nil_list:List[Int] = Nil 
val result2 = test_RDD.aggregateByKey(nil_list)(
    (acc, value) => value :: acc, 
    (acc1, acc2) => acc1 ::: acc2) 

Il risultato è questo

result2.take(3) 
>> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6))) 
0

Suona come quello che si vuole veramente è quello di salvare il tuo RDD KV in un file separato per ciascuno chiave. Piuttosto che creare un Map[Key, RDD[Value]], considera l'utilizzo di un MultipleTextOutputFormatsimilar to the example here. Il codice è praticamente tutto lì nell'esempio.

Il vantaggio di questo approccio è che è garantito solo un passaggio sull'RDD dopo lo shuffle e si ottiene lo stesso risultato desiderato. Se lo facessi filtrando e creando diversi ID come suggerito nell'altra risposta (a meno che i tuoi sorgenti non supportassero i filtri pushdown) finiresti per fare un passaggio sul set di dati per ogni singola chiave che sarebbe molto più lenta.

Problemi correlati