Leggermente modificati i dati di input.
val scores = sc.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2),
("a", 6),
("b", 8)
))
spiego come fare passo per passo:
1.Group da chiave per creare un array
scores.groupByKey().foreach(println)
Risultato:
(b,CompactBuffer(3, 1, 4, 2, 8))
(a,CompactBuffer(1, 2, 3, 4, 6))
Come vedete, ogni il valore stesso è una matrice di numeri. CompactBuffer è solo un array ottimizzato.
2.Per ogni tasto, elenco di ordinamento inverso dei numeri che valore contiene
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)}).foreach(println)
Risultato:
(b,List(8, 4, 3, 2, 1))
(a,List(6, 4, 3, 2, 1))
3.Keep solo i primi 2 elementi della seconda fase, che sarà top 2 punteggi nella lista
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).foreach(println)
Risultato:
(a,List(6, 4))
(b,List(8, 4))
4.Flat mappa per creare nuovi Paired RDD per ogni punteggio chiave e top
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println)
Risultato:
(b,8)
(b,4)
(a,6)
(a,4)
5.Optional passo - ordina per chiave se si vuole
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println)
Risultato:
(a,6)
(a,4)
(b,8)
(b,4)
Speranza, questa spiegazione ha aiutato a comprendere la logica.
Questo non sembra funzionare?Questo è l'output: Array [(String, (Int, Int))] = Array ((a, (4,4)), (b, (4,4))) –
Ho ottenuto che questo funzioni adattando la risposta di user52045 : colonne val = sc.parallelize (Array ( ("a", 1), ("a", 2), ("a", 3), ("b", 3), ("b", 1), ("a", 4), ("b", 4), ("b", 2) )) scores.mapValues (p => (p, p)) .reduceByKey ((u, v) => { val valori = Elenco (u_1, u._2, v._1, v._2) .sorted (Ordinamento [Int] .risposta) .distinto (valori (0), valori (1)) }). Collect() –
@michael_erasmus È corretto che ci sia un bug nel mio codice. Thx per ripararlo. Una cosa devi fare attenzione perché se tutti gli elementi della lista sono uguali si uscirà daOfBoudException. – abalcerek