2009-11-17 16 views
15

I have iteration vals: Iterable[T] e una funzione di lunga durata senza effetti collaterali rilevanti: f: (T => Unit). In questo momento questo viene applicato al vals in modo ovvio:Mappa concorrente/foreach in scala

vals.foreach(f)

Vorrei le chiamate f essere fatto contemporaneamente (entro limiti ragionevoli). Esiste una funzione ovvia da qualche parte nella libreria di base di Scala? Qualcosa di simile:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

Mentre f è ragionevolmente lungo in esecuzione, è abbastanza breve che non voglio che il sovraccarico di invocare un thread per ogni chiamata, quindi sono alla ricerca di qualcosa sulla base di un pool di thread.

risposta

10

mi piace la risposta Futures. Tuttavia, mentre eseguirà contemporaneamente, verrà restituito anche in modo asincrono, che probabilmente non è quello che desideri. L'approccio corretto sarebbe la seguente:

import scala.actors.Futures._ 

vals map { x => future { f(x) } } foreach { _() } 
+8

Fai attenzione che 'vals' è una raccolta rigorosa - se è pigro (e in Scala 2.7 questo include la classe' Range'), i futuri non saranno creati fino a quando ognuno è necessario da 'foreach', e nulla accadrà in parallelo. –

+0

Suppongo che potremmo risolvere questo problema iniettando un'altra chiamata 'foreach' tra' map' e corrente 'foreach'. Quindi: 'vals map {x => future {f (x)}} foreach {x => x} foreach {_()}' –

+0

Questa sarebbe una mappa che dobbiamo iniettare, non un'altra foreach? E non mi è chiaro che la mappa di una collezione pigra sia rigorosa. Il modo più sicuro potrebbe essere quello di chiamare aArray. –

3

Ho avuto alcuni problemi con scala.actors.Futures in Scala 2.8 (era difettoso quando ho controllato). Utilizzo di librerie Java hanno lavorato direttamente per me, però:

final object Parallel { 
    val cpus=java.lang.Runtime.getRuntime().availableProcessors 
    import java.util.{Timer,TimerTask} 
    def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms) 
    def repeat(n: Int,f: Int=>Unit) = { 
    import java.util.concurrent._ 
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1) 
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)})) 
    e.shutdown 
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS) 
    } 
} 
2

userei scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t))) 
13

Scalaz ha parMap. Si potrebbe usarlo come segue:

import scalaz.Scalaz._ 
import scalaz.concurrent.Strategy.Naive 

Ciò dotare ogni funtore (compresi Iterable) con un metodo parMap, così appena si può fare:

vals.parMap(f) 

Vedrete anche parFlatMap, parZipWith, etc.

2

L'ultima versione di Functional Java ha alcune funzioni di concorrenza di ordine superiore che è possibile utilizzare.

import fjs.F._ 
import fj.control.parallel.Strategy._ 
import fj.control.parallel.ParModule._ 
import java.util.concurrent.Executors._ 

val pool = newCachedThreadPool 
val par = parModule(executorStrategy[Unit](pool)) 

E poi ...

par.parMap(vals, f) 

Ricordarsi di shutdown il pool.

0

È possibile utilizzare Parallel Collections dalla libreria standard di Scala. Sono come collezioni ordinarie, ma le loro operazioni funzionano in parallelo. Devi solo mettere una chiamata par prima di richiamare alcune operazioni di raccolta.

import scala.collection._ 

val array = new Array[String](10000) 
for (i <- (0 until 10000).par) array(i) = i.toString 
9

Molte delle risposte a partire dal 2009 ancora usare il vecchio scala.actors.Futures._, che non è più nel recente Scala sono. Mentre Akka è il modo preferito, un modo molto più leggibile è usare solo il parallelo (.par) collezioni:

vals.foreach { v => f(v) }

diventa

vals.par.foreach { v => f(v) }

In alternativa, utilizzando parMap può apparire più succinta ma con l'avvertenza che è necessario ricordarsi di importare il solito Scalaz *. Come al solito, c'è più di un modo per fare la stessa cosa in Scala!

+0

Questa dovrebbe essere la risposta accettata –