2012-12-03 12 views
5

Ho un attore Akka responsabile della gestione delle chiamate http. Io uso scala spedizione per inviare più richieste HTTP su un'API:Come gestire più promesse in un attore (akka)?

urls.foreach { u 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
    for(p <- promise) 
    { 
    p match 
    { 
     case Left(error) => 
     faultHandler(error) 
     case Right(result) => 
     resultHandler(result) 
    } 
    } 

Nella funzione resultHandler, ho incrementare un'istanza variabile nbOfResults e confrontare il numero di chiamate che ho fatto.

def resultHandler(result:String) 
{ 
    this.nbOfResults++ 
    ... 
    if(nbOfResults == nbOfCalls) 
    // Do something 
} 

È sicuro? Si può accedere allo nbOfResults accessibile allo stesso tempo se due chiamate restituiscono i risultati simultaneamente?

Per ora, ho ritenuto che l'attore sia più o meno equivalente a un thread e pertanto le funzioni di callback non vengono eseguite contemporaneamente. È corretto ?

+0

Le risposte che seguono contengono suggerimenti su come farlo, vorrei solo per indicare chiaramente per la cronaca che sì, è Bisogna stare attenti con i callback asincroni, ESSI SARANNO ESEGUITI CONCORRENTI. Quindi, in breve, la gestione di nbOfResults nel codice sopra non è corretta. –

risposta

3

Qui è una variante di risposta Alexey Romanov utilizzando solo la spedizione:

//Promises will be of type Array[Promise[Either[Throwable, String]]] 
val promises = urls.map { u => 
    val service = url(u) 

    Http(service OK as.String).either 
} 

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]] 
//So listPromise is now of type Promise[Array[Either[Throwable, String]]] 
val listPromise = Http.promise.all(promises) 

for (results <- listPromise) { 
    //Here results is of type Array[Either[Throwable, String]] 

    results foreach { result => 
     result match { 
      Left(error) => //Handle error 
      Right(response) => //Handle response 
     } 
    } 
} 
2

C'è un modo di gran lunga migliore:

val promises = urls.map {u => 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
} 

val listPromise = Future.sequence(promises) 

listPromise.onComplete { whatever } 
2

Sono d'accordo con Alexey Romanov sulla sua risposta. In qualunque modo tu scelga di sincronizzare le tue richieste http, attenti al modo in cui stai elaborando il completamento delle promesse. La tua intuizione è corretta in quanto l'accesso concorrente può apparire sullo stato dell'attore. Il modo migliore per gestire questa situazione potrebbe essere quella di fare qualcosa di simile:

def resultHandler(result: String) { 
    //on completion we are sending the result to the actor who triggered the call 
    //as a message 
    self ! HttpComplete(result) 
} 

e riceve la funzione dell'attore:

def receive = { 
    //PROCESS OTHER MESSAGES HERE 
    case HttpComplete(result) => //do something with the result 
} 

In questo modo, è assicurarsi che l'elaborazione dei risultati HTTP non violerà stato dell'attore dall'esterno, ma dal ciclo ricevere dell'attore, che è il modo corretto per farlo

1
val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls) 

// After particular call was ended  
if (nbOfResults.decrementAndGet <= 0) { 
    // Do something 
} 

[EDIT] Rimosso risposta vecchia con AtomicReference CAS - while (true), compareAndSet , Ecc

+0

Cosa c'è di sbagliato con incrementAndGet? –

+0

Aggiunta una variante di risposta, considerando il suggerimento – idonnie

Problemi correlati