2011-09-10 19 views
5

Stavo giocando con .par e mi chiedo se il seguente calcolo può essere ulteriormente parallelizzato per i guadagni di prestazioni o se ci sono altri modi per calcolare il risultato più velocemente. Non penso che il risultato finale dipenda dall'ordine del gruppo, quindi spero che ci siano ulteriori possibili guadagni.Come parallelizzare groupBy

object Test { 
    val data = (1 to 500000) map { i => (i % 100) -> (i % 10000) } 

    def mutableIndex = { 
    val map = collection.mutable.Map[Int, Set[Int]]().withDefaultValue(
     Set[Int]()) 
    for ((k, v) <- data) { map(k) = map(k) + v } 
    map 
    } 

    def immutableIndex = data.groupBy(_._1).map{ case (k, seq) => 
    k -> seq.map(_._2).toSet 
    } 

    def immutableParIndex = data.par.groupBy(_._1).map{ case (k, seq) => 
    k -> seq.map(_._2).toSet 
    } 

    def main(args: Array[String]) { 
    def bench(id: String)(block: => Unit) { 
     val times = (new testing.Benchmark { def run() = block }).runBenchmark(10) 
     println(id + " " + times + " sum: " + times.sum) 
    } 
    println("avail procs " + Runtime.getRuntime.availableProcessors) 
    bench("mutable"){ mutableIndex } 
    bench("immutable"){ immutableIndex } 
    bench("immutable par"){ immutableParIndex } 
    } 

} 

esecuzione la stampa questo - usando 2.9.1:

$ scalac -d classes -optimize A.scala 
$ scala -cp classes Test 
avail procs 4 
mutable List(718, 343, 296, 297, 312, 312, 312, 312, 312, 312) sum: 3526 
immutable List(312, 266, 266, 265, 265, 265, 265, 265, 249, 265) sum: 2683 
immutable par List(546, 234, 234, 202, 187, 172, 188, 172, 187, 171) sum: 2293 

Alcune note:

  • anche se l'uscita di cui sopra è abbastanza piacevole, la versione parallela è anche molto più incoerente a seconda sulle costanti che uso in data e quante iterazioni configuro in bench (a volte meno efficienti di quelle sequenziali). Mi chiedo se è previsto per le collezioni parallele.
  • mutevole diventa più veloce come l'insieme diventa più piccolo (diminuendo l'ultimo modulo di dati)
  • se il mio punto di riferimento è viziata, fatemi sapere come risolvere il problema (ad esempio, io uso gli stessi dati per tutte le iterazioni, non so se che distorce i risultati)

Edit: qui è una versione basata su hashmap concorrente e sul modello del codice della libreria per groupBy:

def syncIndex = { 
    import collection.mutable.Builder 
    import java.util.concurrent.ConcurrentHashMap 
    import collection.JavaConverters._ 
    val m = new ConcurrentHashMap[Int, Builder[Int, Set[Int]]]().asScala 
    for ((k, v) <- data.par) { 
    val bldr = Set.newBuilder[Int] 
    m.putIfAbsent(k, bldr) match { 
     case Some(bldr) => bldr.synchronized(bldr += v) 
     case None => bldr.synchronized(bldr += v) 
    } 
    } 
    val b = Map.newBuilder[Int, Set[Int]] 
    for ((k, v) <- m) 
    b += ((k, v.result)) 
    b.result 
} 

e seee ms per dare una bella accelerazione su 2 core ma non su 4.

risposta

2

Non proprio una risposta alla tua domanda, ma ho trovato che .par dà un aumento di velocità soprattutto sul client Hotspot (32-bit?), e non così tanto sul server Hotspot. L'ho eseguito nel REPL e il benchmark diventa più veloce nelle esecuzioni successive, dal momento che è già in fase di riscaldamento.

Ho osservato l'utilizzo del processore su Task Manager e per ciascuno, e va dal 54% circa alle attività non parallelizzate al 75% su parallelizzato.

Java 7 offre anche un notevole aumento di velocità.

Benvenuti in Scala versione 2.9.0.1 (Java Client HotSpot (TM), Java 1.6.0_22).

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(1303, 1086, 1058, 1132, 1071, 1068, 1035, 1037, 1036, 1032) sum: 10858 
immutable List(874, 872, 869, 856, 858, 857, 855, 855, 857, 849) sum: 8602 
immutable par List(688, 502, 482, 479, 480, 465, 473, 473, 471, 472) sum: 4985 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(1015, 1025, 1090, 1026, 1011, 1021, 1014, 1017, 1011, 1015) sum: 10245 
immutable List(863, 868, 867, 865, 864, 883, 865, 863, 864, 864) sum: 8666 
immutable par List(466, 468, 463, 466, 466, 469, 470, 467, 478, 467) sum: 4680 

Benvenuti alla Scala versione 2.9.0.1 (Java HotSpot (TM) a 64 bit di server VM, Java 1.6.0_22).

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(841, 360, 348, 338, 337, 338, 338, 342, 336, 336) sum: 3914 
immutable List(320, 303, 302, 300, 304, 302, 305, 299, 305, 299) sum: 3039 
immutable par List(521, 284, 244, 244, 232, 267, 209, 219, 231, 203) sum: 2654 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(370, 393, 351, 342, 336, 343, 342, 340, 334, 340) sum: 3491 
immutable List(301, 301, 302, 305, 300, 299, 303, 305, 304, 301) sum: 3021 
immutable par List(207, 240, 201, 194, 204, 194, 197, 211, 207, 208) sum: 2063 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(334, 336, 338, 339, 340, 338, 341, 334, 336, 340) sum: 3376 
immutable List(300, 303, 297, 301, 298, 305, 302, 304, 296, 296) sum: 3002 
immutable par List(194, 200, 190, 201, 192, 191, 195, 196, 202, 189) sum: 1950 

Benvenuti alla Scala versione 2.9.0.1 (Java HotSpot (TM) a 64 bit di server VM, Java 1.7.0).

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(763, 258, 227, 235, 238, 279, 245, 227, 227, 243) sum: 2942 
immutable List(274, 233, 228, 235, 238, 247, 243, 229, 233, 245) sum: 2405 
immutable par List(635, 303, 261, 258, 217, 291, 204, 248, 219, 184) sum: 2820 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(229, 229, 229, 230, 234, 226, 227, 227, 227, 232) sum: 2290 
immutable List(228, 247, 231, 234, 210, 210, 209, 211, 210, 210) sum: 2200 
immutable par List(173, 209, 160, 157, 158, 177, 179, 164, 163, 159) sum: 1699 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(222, 218, 216, 214, 216, 215, 215, 219, 219, 218) sum: 2172 
immutable List(211, 210, 211, 211, 212, 215, 215, 210, 211, 210) sum: 2116 
immutable par List(161, 158, 168, 158, 156, 161, 150, 156, 163, 175) sum: 1606 
+0

interessante, dovrò provare i tempi con 1.7.0. Ho aggiunto una versione utilizzando una hashmap concomitante nella domanda. È stato più veloce su 2 core ma più lento su 4. Sono curioso di vedere cosa farà in 1.7.0. Inoltre ho notato che a volte in REPL ho ottenuto risultati più veloci rispetto a quando eseguivo il codice compilato! – huynhjl

+0

@huynhjl Penso che la VM che esegue il tuo REPL si scaldi in una certa misura indipendentemente da ciò che viene eseguito su di essa, e quindi sia più veloce di avviare una nuova VM per fare il benchmark dal freddo. Puoi mostrarlo eseguendo un benchmark sul REPL, quindi creando un nuovo oggetto Test ed eseguendolo: per me le tempistiche erano più veloci rispetto alla prima istanza. Inoltre, dovresti provare ad aumentare la memoria disponibile della VM. Il primo benchmark che ho fatto su un REPL "vecchio" era come 10 volte più lento, prima di andare in crash con un errore di memoria insufficiente. –

+0

Passeremo all'implementazione 'groupBy' per utilizzare il nuovo Concurrent Hash Tries, probabilmente nella prossima versione. Questo dovrebbe aumentare la scalabilità. – axel22

0

Il consiglio generale è di usare pinza per microbecnhmarking: https://github.com/sirthias/scala-benchmarking-template

Inoltre, essere consapevoli del fatto che a volte par esegue la copia della struttura iniziale (almeno in 2.9.1, vedere https://issues.scala-lang.org/browse/SI-4984), ad esempio,

`

scala> val data = (1L to 50000000) par (100) 
java.lang.OutOfMemoryError: Java heap space 
     at scala.math.Integral$class.mkNumericOps(Integral.scala:25) 
     at scala.math.Numeric$LongIsIntegral$.mkNumericOps(Numeric.scala:115) 
     at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:75) 
     at scala.collection.Parallelizable$class.par(Parallelizable.scala:41) 
     at scala.collection.immutable.NumericRange.par(NumericRange.scala:42) 

`

+0

Link non disponibile. – huynhjl

+0

Il metodo 'par' non dovrebbe copiare i dati per la mappa immutabile predefinita - il' immutable.HashMap' - che è il tipo di 'data'. – axel22