2010-11-11 12 views
7

Provare a comprendere come pensare in termini di attori anziché di thread. Sono un po 'perplesso sul seguente caso d'uso:Transizione dal modello di thread agli attori

in considerazione un sistema che ha un processo produttore che crea il lavoro (per esempio, la lettura dei dati da un file), e una serie di processi di lavoro che consumano il lavoro (ad esempio analizzando i dati e scrivendoli in un database). Le velocità con cui il lavoro viene prodotto e consumato possono variare e il sistema dovrebbe rimanere solido a questo scopo. Ad esempio, se i lavoratori non riescono a tenere il passo, il produttore dovrebbe rilevarlo e alla fine rallentare o aspettare.

Questo è abbastanza facile da implementare con i fili:

val producer:Iterator[Work] = createProducer() 
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE) 
val workers = (0 until NUM_WORKERS) map { i => 
    new Thread() { 
    override def run() = { 
     while (true) { 
     try { 
      // take next unit of work, waiting if necessary 
      val work = queue.take() 
      process(work) 
     } 
     catch { 
      case e:InterruptedException => return 
     } 
     } 
    } 
    } 
} 

// start the workers 
workers.foreach(_.start()) 

while (producer.hasNext) { 
    val work = producer.next() 
    // add new unit of work, waiting if necessary 
    queue.put(work) 
} 

while (!queue.isEmpty) { 
    // wait until queue is drained 
    queue.wait() 
} 

// stop the workers 
workers.foreach(_.interrupt()) 

Non c'è niente di sbagliato in questo modello, e ho usato con successo prima. Questo esempio è probabilmente troppo dettagliato, poiché l'utilizzo di un Executor o di un CompletionService si adatta bene a questo compito. Ma mi piace l'astrazione dell'attore, e penso che sia più facile ragionare in molti casi. C'è un modo per riscrivere questo esempio usando gli attori, specialmente assicurandosi che non ci siano buffer overflow (ad esempio caselle postali complete, messaggi abbandonati, ecc.)?

risposta

3

Poiché gli attori elaborano i messaggi "offline" (ossia il consumo dei messaggi non è collegato alla loro ricezione), è difficile vedere come si possa avere un analogo esatto del "produttore attende che i consumatori raggiungano".

L'unica cosa che posso pensare è che i consumatori richiedono il lavoro dal l'attore produttore (che utilizza reply):

case object MoreWorkPlease 
class Consumer(prod : Producer) extends Actor { 
    def act = { 
    prod ! MoreWorkPlease 
    loop { 
     react { 
     case Work(payload) => doStuff(payload); reply(MoreWorkPlease) 
     } 
    } 
    } 
} 

class Producer extends Actor { 
    def act = loop { 
    react { 
     case MoreWorkPlease => reply(Work(getNextItem)) 
    } 
    } 
} 

Questo non è perfetto, naturalmente, perché il produttore non "leggere avanti "e ottiene lavoro solo quando un consumatore è pronto per questo. L'utilizzo sarebbe qualcosa di simile:

val prod = new Producer 
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start()) 
prod.start() 
+0

Hrm, questa è una soluzione che avevo pensato. Probabilmente è sufficiente, ma la mia preoccupazione è che se i lavoratori superano il produttore, la mancanza di un buffer di lavoro si traduce in prestazioni degradate. – toluju

+0

@toluju - Iniziare chiedendo a tutti i consumatori di chiedere lavoro e chiedere al produttore di reagire a questi messaggi, ma riceverli e metterli in coda se non c'è ancora molto lavoro da fare. (Poi, una volta che c'è lavoro, può dividerlo in elementi nella coda.) –

Problemi correlati