2015-05-11 17 views
11

dire che ho un PairRDD come tali (ovviamente molto più dati nella vita reale, assumono milioni di record):Spark: Get top N con chiave

val scores = sc.parallelize(Array(
     ("a", 1), 
     ("a", 2), 
     ("a", 3), 
     ("b", 3), 
     ("b", 1), 
     ("a", 4), 
     ("b", 4), 
     ("b", 2) 
)) 

Qual è il modo più efficace per generare un RDD con i primi 2 punteggi per chiave?

val top2ByKey = ... 
res3: Array[(String, Int)] = Array((a,4), (a,3), (b,4), (b,3)) 

risposta

10

credo che questo dovrebbe essere abbastanza efficace:

Redatta secondo i commenti OP:

scores.mapValues(p => (p, p)).reduceByKey((u, v) => { 
    val values = List(u._1, u._2, v._1, v._2).sorted(Ordering[Int].reverse).distinct 
    if (values.size > 1) (values(0), values(1)) 
    else (values(0), values(0)) 
}).collect().foreach(println) 
+0

Questo non sembra funzionare?Questo è l'output: Array [(String, (Int, Int))] = Array ((a, (4,4)), (b, (4,4))) –

+1

Ho ottenuto che questo funzioni adattando la risposta di user52045 : colonne val = sc.parallelize (Array ( ("a", 1), ("a", 2), ("a", 3), ("b", 3), ("b", 1), ("a", 4), ("b", 4), ("b", 2) )) scores.mapValues ​​(p => (p, p)) .reduceByKey ((u, v) => { val valori = Elenco (u_1, u._2, v._1, v._2) .sorted (Ordinamento [Int] .risposta) .distinto (valori (0), valori (1)) }). Collect() –

+1

@michael_erasmus È corretto che ci sia un bug nel mio codice. Thx per ripararlo. Una cosa devi fare attenzione perché se tutti gli elementi della lista sono uguali si uscirà daOfBoudException. – abalcerek

0
scores.reduceByKey(_ + _).map(x => x._2 -> x._1).sortByKey(false).map(x => x._2 -> x._1).take(2).foreach(println) 
+3

Ciao, benvenuto a Stack Overflow. Si prega di non scaricare semplicemente il codice come risposta. Spiega la tua linea di pensiero in modo che possiamo capire meglio. Leggi questo se hai dei dubbi: http://stackoverflow.com/help/how-to-answer Grazie. – Cthulhu

+1

Credo che score.reduceByKey (_ + _) collocherebbe tutte le coppie con la stessa chiave in modo da finire con un singolo (a, N) e un singolo (b, M) dove N e M sono la somma di tutti valori e valori b, rispettivamente. A quel punto si otterrebbe solo una singola (a, N) e nessuna quantità di decomposizione (a, i) e (a, j) dove i e j sono i due valori più alti per tutte le coppie. –

2

Leggermente modificati i dati di input.

val scores = sc.parallelize(Array(
     ("a", 1), 
     ("a", 2), 
     ("a", 3), 
     ("b", 3), 
     ("b", 1), 
     ("a", 4), 
     ("b", 4), 
     ("b", 2), 
     ("a", 6), 
     ("b", 8) 
    )) 

spiego come fare passo per passo:

1.Group da chiave per creare un array

scores.groupByKey().foreach(println) 

Risultato:

(b,CompactBuffer(3, 1, 4, 2, 8)) 
(a,CompactBuffer(1, 2, 3, 4, 6)) 

Come vedete, ogni il valore stesso è una matrice di numeri. CompactBuffer è solo un array ottimizzato.

2.Per ogni tasto, elenco di ordinamento inverso dei numeri che valore contiene

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)}).foreach(println) 

Risultato:

(b,List(8, 4, 3, 2, 1)) 
(a,List(6, 4, 3, 2, 1)) 

3.Keep solo i primi 2 elementi della seconda fase, che sarà top 2 punteggi nella lista

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).foreach(println) 

Risultato:

(a,List(6, 4)) 
(b,List(8, 4)) 

4.Flat mappa per creare nuovi Paired RDD per ogni punteggio chiave e top

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println) 

Risultato:

(b,8) 
(b,4) 
(a,6) 
(a,4) 

5.Optional passo - ordina per chiave se si vuole

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println) 

Risultato:

(a,6) 
(a,4) 
(b,8) 
(b,4) 

Speranza, questa spiegazione ha aiutato a comprendere la logica.