Stavo giocando con .par
e mi chiedo se il seguente calcolo può essere ulteriormente parallelizzato per i guadagni di prestazioni o se ci sono altri modi per calcolare il risultato più velocemente. Non penso che il risultato finale dipenda dall'ordine del gruppo, quindi spero che ci siano ulteriori possibili guadagni.Come parallelizzare groupBy
object Test {
val data = (1 to 500000) map { i => (i % 100) -> (i % 10000) }
def mutableIndex = {
val map = collection.mutable.Map[Int, Set[Int]]().withDefaultValue(
Set[Int]())
for ((k, v) <- data) { map(k) = map(k) + v }
map
}
def immutableIndex = data.groupBy(_._1).map{ case (k, seq) =>
k -> seq.map(_._2).toSet
}
def immutableParIndex = data.par.groupBy(_._1).map{ case (k, seq) =>
k -> seq.map(_._2).toSet
}
def main(args: Array[String]) {
def bench(id: String)(block: => Unit) {
val times = (new testing.Benchmark { def run() = block }).runBenchmark(10)
println(id + " " + times + " sum: " + times.sum)
}
println("avail procs " + Runtime.getRuntime.availableProcessors)
bench("mutable"){ mutableIndex }
bench("immutable"){ immutableIndex }
bench("immutable par"){ immutableParIndex }
}
}
esecuzione la stampa questo - usando 2.9.1:
$ scalac -d classes -optimize A.scala
$ scala -cp classes Test
avail procs 4
mutable List(718, 343, 296, 297, 312, 312, 312, 312, 312, 312) sum: 3526
immutable List(312, 266, 266, 265, 265, 265, 265, 265, 249, 265) sum: 2683
immutable par List(546, 234, 234, 202, 187, 172, 188, 172, 187, 171) sum: 2293
Alcune note:
- anche se l'uscita di cui sopra è abbastanza piacevole, la versione parallela è anche molto più incoerente a seconda sulle costanti che uso in
data
e quante iterazioni configuro inbench
(a volte meno efficienti di quelle sequenziali). Mi chiedo se è previsto per le collezioni parallele. - mutevole diventa più veloce come l'insieme diventa più piccolo (diminuendo l'ultimo modulo di dati)
- se il mio punto di riferimento è viziata, fatemi sapere come risolvere il problema (ad esempio, io uso gli stessi dati per tutte le iterazioni, non so se che distorce i risultati)
Edit: qui è una versione basata su hashmap concorrente e sul modello del codice della libreria per groupBy
:
def syncIndex = {
import collection.mutable.Builder
import java.util.concurrent.ConcurrentHashMap
import collection.JavaConverters._
val m = new ConcurrentHashMap[Int, Builder[Int, Set[Int]]]().asScala
for ((k, v) <- data.par) {
val bldr = Set.newBuilder[Int]
m.putIfAbsent(k, bldr) match {
case Some(bldr) => bldr.synchronized(bldr += v)
case None => bldr.synchronized(bldr += v)
}
}
val b = Map.newBuilder[Int, Set[Int]]
for ((k, v) <- m)
b += ((k, v.result))
b.result
}
e seee ms per dare una bella accelerazione su 2 core ma non su 4.
interessante, dovrò provare i tempi con 1.7.0. Ho aggiunto una versione utilizzando una hashmap concomitante nella domanda. È stato più veloce su 2 core ma più lento su 4. Sono curioso di vedere cosa farà in 1.7.0. Inoltre ho notato che a volte in REPL ho ottenuto risultati più veloci rispetto a quando eseguivo il codice compilato! – huynhjl
@huynhjl Penso che la VM che esegue il tuo REPL si scaldi in una certa misura indipendentemente da ciò che viene eseguito su di essa, e quindi sia più veloce di avviare una nuova VM per fare il benchmark dal freddo. Puoi mostrarlo eseguendo un benchmark sul REPL, quindi creando un nuovo oggetto Test ed eseguendolo: per me le tempistiche erano più veloci rispetto alla prima istanza. Inoltre, dovresti provare ad aumentare la memoria disponibile della VM. Il primo benchmark che ho fatto su un REPL "vecchio" era come 10 volte più lento, prima di andare in crash con un errore di memoria insufficiente. –
Passeremo all'implementazione 'groupBy' per utilizzare il nuovo Concurrent Hash Tries, probabilmente nella prossima versione. Questo dovrebbe aumentare la scalabilità. – axel22