2010-11-08 17 views
5

Questo sta usando gli attori Scala 2.8. Ho un lavoro di lunga durata che può essere parallelizzato. Consiste di circa 650.000 unità di lavoro. Mi divido in 2600 diverse attività secondarie separati, e per ciascuno di essi a creare un nuovo attore:Come prevenire la fame degli attori in presenza di altri attori di lunga durata?

actor { 
    val range = (0L to total by limit) 
    val latch = new CountDownLatch(range.length) 
    range.foreach { offset => 
    actor { 
     doExpensiveStuff(offset,limit) 
     latch.countDown 
    } 
    } 
    latch.await 
} 

Questo funziona abbastanza bene, ma prende il grado 2 + h per completare. Il problema è che nel frattempo, tutti gli altri attori che creo per svolgere compiti normali sembrano essere affamati dai primi 2600 attori che sono anche pazientemente in attesa del loro tempo per essere eseguiti su un thread, ma hanno aspettato più a lungo di qualsiasi nuovo attore che Vieni.

Come potrei fare per evitare questa fame?

pensieri iniziali:

  • Invece di 2600 attori, utilizzare un attore che sequenziale aratri attraverso il grande mucchio di lavoro. Non mi piace questo perché mi piacerebbe che questo lavoro finisse prima dividerlo.
  • Invece di 2600 attori, utilizzare due attori, ognuno dei quali elabora una metà diversa del set di lavoro totale. Questo potrebbe funzionare meglio, ma cosa succede se la mia macchina ha 8 core? Probabilmente vorrei utilizzare di più.

UPDATE

Alcune persone hanno messo in dubbio l'uso di attori a tutti, tanto più che la capacità di passare un messaggio non veniva utilizzato entro i lavoratori. Avevo ipotizzato che l'attore fosse un'astrazione molto leggera attorno a un ThreadPool allo stesso livello di prestazioni o quasi lo stesse semplicemente codificando manualmente l'esecuzione basata su ThreadPool. Così ho scritto un po 'di riferimento:

import testing._ 
import java.util.concurrent._ 
import actors.Futures._ 

val count = 100000 
val poolSize = 4 
val numRuns = 100 

val ActorTest = new Benchmark { 
    def run = { 
    (1 to count).map(i => future { 
     i * i 
    }).foreach(_()) 
    } 
} 

val ThreadPoolTest = new Benchmark { 
    def run = { 
    val queue = new LinkedBlockingQueue[Runnable] 
    val pool = new ThreadPoolExecutor(
      poolSize, poolSize, 1, TimeUnit.SECONDS, queue) 
    val latch = new CountDownLatch(count) 
    (1 to count).map(i => pool.execute(new Runnable { 
     override def run = { 
     i * i 
     latch.countDown 
     } 
    })) 
    latch.await 
    } 
} 

List(ActorTest,ThreadPoolTest).map { b => 
    b.runBenchmark(numRuns).sum.toDouble/numRuns 
} 

// List[Double] = List(545.45, 44.35) 

ho usato l'astrazione Futuro nelle ActorTest per evitare di passare un messaggio a un altro attore per segnalare il lavoro è stato fatto. Sono stato sorpreso di scoprire che il mio codice attore era oltre 10 volte più lento. Nota che ho anche creato il mio ThreadPoolExecutor con una dimensione del pool iniziale con cui è stato creato il pool di attori predefinito.

Ripensandoci, sembra che abbia abusato dell'astrazione dell'attore. Ho intenzione di utilizzare ThreadPools separati per questi compiti distinti, costosi e di lunga durata.

+0

Nulla sul problema descritto ha bisogno di attori. Dato che stai dividendo il lavoro in un numero di blocchi identici, puoi semplicemente usare i futures - guarda la mia risposta sotto –

risposta

6

Non importa quanti attori che hai, se non sei la configurazione di pianificazione in modo esplicito, tutti loro sono appoggiati con un unico forchetta /unirsi scheduler (in esecuzione nei confronti di un pool di thread con una capacità 4, se non mi sbaglio). Ecco da dove viene la fame.

  1. Si dovrebbe provare diverse utilità di pianificazione per la vostra piscina di attori, per trovare quello che mostra la migliore performance (provate ResizableThreadPoolScheduler, se si vuole massimizzare il parallelismo con il più discussioni possibile)
  2. è necessario avere uno schedulatore separato per l'enorme pool di attori (altri attori del tuo sistema non lo usano)
  3. Come è stato suggerito da @DaGGeRRz puoi provare il framework Akka che offre dispatcher configurabili (ad esempio, il furto del lavoro il bilanciamento del carico il dispatcher sposta gli eventi dalle caselle di posta degli attori impegnati agli attori oziosi)

Dai commenti per impostazione predefinita Actor realizzazione:

Il sistema run-time può essere configurato per utilizzare una più grande dimensione pool di thread (per esempio, impostando la proprietà actors.corePoolSize JVM). Il metodo del tratto Actor scheduler può essere sostituita per restituire un ResizableThreadPoolScheduler, che ridimensiona sua piscina filo per evitare inedia causata da attori che invocano metodi blocco arbitrario. La proprietà actors.enableForkJoin JVM può essere impostata su false, nel qual caso uno ResizableThreadPoolScheduler viene utilizzato per impostazione predefinita per l'esecuzione di attori.

Inoltre: un'interessante thread on schedulers a scala-lang.

+2

Vasil ha ragione sull'utilizzo dei thread. Pensavo erroneamente che gli attori creati dalla forma abbreviata di thread-block generassero un thread per attore, ma come lui dice, sono tutti in esecuzione dal pool di thread di Scala Actor. Cancellando la mia risposta, Vasil lo copre meglio. – DaGGeRRz

+0

Grazie Vasil. Ho deciso di andare con un threadpool (vedi modifica su OP) per le prestazioni alla luce del fatto che in questo caso non avevo davvero bisogno di usare Actors. – Collin

3

Non ho usato attori con quella sintassi, ma per impostazione predefinita penso che tutti gli attori di scala utilizzino un pool di thread.

Vedi How to designate a thread pool for actors

+0

Sì, vuole impedire ai 2600 attori del lavoro di affamare gli altri che ha davvero bisogno di metterli su pool di thread separati. –

4

Dal tuo esempio sembra che non sia effettivamente necessario utilizzare gli attori, in quanto non si trasmettono messaggi alle unità di lavoro, non si risponde o si esegue il ciclo.

Perché non creare un carico di Future s e quindi attendere che finiscano? In questo modo, la forcella sottostante Registrazione piscina è completamente libero di decidere il livello appropriato di parallelismo (cioè # di fili) per il sistema:

import actors.Futures._ 
def mkFuture(i : Int) = future { 
    doExpensiveStuff(i, limit) 
} 
val fs = (1 to range by limit).map(mkFuture) 
awaitAll(timeout, fs) //wait on the work all finishing 

nota che si sta solo andando a beneficio dal parallelismo di trasformazione più attività contemporaneamente rispetto al sistema hanno core se il costoso lavoro non è legato alla CPU (forse è legato all'IO).

+0

Futures in scala.actors.Le attività sono solo astrazioni sugli attori, quindi alla fine si ottiene lo stesso problema. Il set iniziale consuma tutti i thread nella piscina e gli altri rimangono affamati. Se hai compiti con caratteristiche comportamentali drammaticamente differenti (ad esempio una corsa molto lunga o una corsa molto breve) è una buona idea separarli.Posso immaginare un pool di thread più intelligente che si partiziona automaticamente, ma non ne conosco uno. –

+1

Non è un attore una cosa piuttosto pesante da creare solo per eseguire qualcosa nel pool di thread sottostante? –

+0

Grazie per la risposta. Avevo effettivamente provato Futures come alternativa prima di postare, ma ho trovato lo stesso comportamento da fame a causa della relazione con gli attori, come ha sottolineato Erik. – Collin