2011-11-07 12 views
7

problema: ho un portafoglio di titoli che devono essere elaborati in modo parallelo. In Java ho usato un threadpool per elaborare ogni sicurezza e utilizzare un latch per il conto alla rovescia. Una volta completato, faccio un po 'di fusione, ecc.fork e join utilizzando Akka

Quindi messaggio al mio SecurityProcessor (che è un attore), e aspetto che tutti i futures siano completati. Alla fine uso un MergeHelper per eseguire la post-elaborazione. Il SecurityProcessor prende un titolo, fa qualche I/O e di elaborazione e le risposte di un Security

val listOfFutures = new ListBuffer[Future[Security]]() 
    var portfolioResponse: Portfolio = _ 
    for (security <- portfolio.getSecurities.toList) { 
    val securityProcessor = actorOf[SecurityProcessor].start() 
    listOfFutures += (securityProcessor ? security) map { 
     _.asInstanceOf[Security] 
    } 
    } 
    val futures = Future.sequence(listOfFutures.toList) 
    futures.map { 
    listOfSecurities => 
     portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities) 
    }.get 

È questo corretta progettazione, e c'è un/modo migliore dispositivo di raffreddamento per implementare questo problema comune con Akka?

risposta

8
val futureResult = Future.sequence(
        portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] } 
       ) map { securities => MergeHelper.merge(portfolio, securities) } 
+0

veramente piaciuto questo suggerimento e funziona come previsto fino a quando ho dovuto dividere e aggiungere mucchio di dichiarazioni Eventhandler.info per eseguire il debug un problema :( –

+0

def debug [T] (t: T): T = {EventHandler .info (t); t} –

+0

akka è fantastico !! –