2013-08-21 8 views
8

Capisco come creare un'applicazione non bloccante basata su messaggi in akka e posso facilmente prendere in giro esempi che eseguono operazioni simultanee e restituire i risultati aggregati in un messaggio. Dove ho difficoltà è capire cosa sono le mie opzioni non bloccanti quando la mia applicazione deve rispondere a una richiesta HTTP. L'obiettivo è di ricevere una richiesta e consegnarlo immediatamente a un attore locale o remoto per eseguire il lavoro, che a sua volta lo distribuirà per ottenere un risultato che potrebbe richiedere del tempo. Sfortunatamente sotto questo modello, non capisco come potrei esprimerlo con una serie di "tell" non bloccante piuttosto che bloccare "ask". Se in qualsiasi punto della catena utilizzo un tell, non ho più un futuro per utilizzato come contenuto di risposta finale (richiesto dall'interfaccia framework http che in questo caso è finagle), ma non è importante . Capisco che la richiesta è sulla sua stessa discussione, e il mio esempio è abbastanza elaborato, ma solo provare a capire le mie opzioni di progettazione.Opzioni di non blocco di Akka quando è richiesta una risposta HTTP

In sintesi, se il mio esempio forzato di seguito può essere rielaborato per bloccare meno mi piace molto capire come. Questo è il mio primo utilizzo di akka da una leggera esplorazione di un anno fa, e in ogni articolo, documento e discussione che ho visto dice di non bloccare i servizi.

Le risposte concettuali possono essere utili ma potrebbero anche essere le stesse di quelle che ho già letto. Lavorare/Modificare il mio esempio sarebbe probabilmente la chiave per la mia comprensione del problema esatto che sto tentando di risolvere. Se l'esempio corrente è in genere ciò che deve essere fatto, anche la conferma è utile, quindi non cerco la magia che non esiste.

Notare i seguenti alias:. Importazione com.twitter.util {Future => TwitterFuture, attendono => TwitterAwait}

object Server { 

    val system = ActorSystem("Example-System") 

    implicit val timeout = Timeout(1 seconds) 

    implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = { 
    val promise = TwitterPromise[T] 
    scFuture onComplete { 
     case Success(result) ⇒ promise.setValue(result) 
     case Failure(failure) ⇒ promise.setException(failure) 
    } 
    promise 
    } 

    val service = new Service[HttpRequest, HttpResponse] { 
    def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
     case "https://stackoverflow.com/a/b/c" => 
     val w1 = system.actorOf(Props(new Worker1)) 

     val r = w1 ? "take work" 

     val response: Future[HttpResponse] = r.mapTo[String].map { c => 
      val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
      resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
      resp 
     } 

     response 
    } 
    } 

//val server = Http.serve(":8080", service); TwitterAwait.ready(server) 

    class Worker1 extends Actor with ActorLogging { 
    def receive = { 
     case "take work" => 
     val w2 = context.actorOf(Props(new Worker2)) 
     pipe (w2 ? "do work") to sender 
    } 
    } 

    class Worker2 extends Actor with ActorLogging { 
    def receive = { 
     case "do work" => 
     //Long operation... 
     sender ! "The Work" 
    } 
    } 

    def main(args: Array[String]) { 
    val r = service.apply(
     com.twitter.finagle.http.Request("https://stackoverflow.com/a/b/c") 
    ) 
    println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work 
    } 
} 

Grazie in anticipo per tutti gli orientamenti offerti!

risposta

5

È possibile evitare l'invio di un futuro come un messaggio utilizzando il pipe pattern -cioè, in Worker1 devi scrivere:

pipe(w2 ? "do work") to sender 

Invece di:

sender ! (w2 ? "do work") 

Ora r sarà un Future[String] invece di un Future[Future[String]].


Aggiornamento: la soluzione pipe di cui sopra è un modo generale, per evitare di avere il vostro attore risponde con un futuro. Come Viktor sottolinea in un commento qui sotto, in questo caso si può prendere la Worker1 fuori dal giro del tutto dicendo Worker2 per rispondere direttamente alla attore che (Worker1) ha ottenuto il messaggio da:

w2.tell("do work", sender) 

Questo won 't essere un'opzione se Worker1 è responsabile per operare sulla reazione Worker2 in qualche modo (utilizzando map su w2 ? "do work", combinando molteplici futuri con flatMap o un for -comprehension, ecc), ma se non è necessario, questa versione è più pulito e più efficiente.


Che uccide uno Await.result. È possibile eliminare l'altro scrivendo qualcosa di simile a quanto segue:

val response: Future[HttpResponse] = r.mapTo[String].map { c => 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

Ora è solo bisogno di trasformare questo in un FutureTwitterFuture. Non posso dirvi in ​​cima alla mia testa esattamente come fare questo, ma dovrebbe essere fairly trivial, e sicuramente non richiede il blocco.

+0

Grazie mille per la tua risposta rapida Travis. Questo fa ripulire il futuro e aspetta un po '. Quindi credi che l'uso delle due domande sia effettivamente richiesto (Ovviamente potrei solo vederlo in quel modo - ma volevo assicurarmi)? Aggiornerò il mio codice per includere i tuoi miglioramenti e includerò le conversioni implicite da un futuro di akka a un futuro di Twitter. Non ho ancora familiarità con l'etichetta Stack Overflow, quindi sto dando un +1 per il miglioramento. Qualsiasi informazione aggiuntiva sulle domande sarebbe utile. Grazie! – Eric

+0

È difficile dire se le richieste siano richieste senza sapere di più di cosa sono responsabili questi attori, ma la parte importante è che chiedere non richiede il blocco (c'è un po 'di contabilità in più, ma è ancora asincrono). Suggerirei anche di mantenere esplicita la conversione tra Twitter e il futuro delle librerie standard, dovendo chiamare un metodo di conversione di solito un piccolo prezzo da pagare per evitare una potenziale confusione in un caso come questo. –

+0

Grazie mille per l'aiuto e l'intuizione su questo Travis. Questo risolve perfettamente le mie preoccupazioni. – Eric

0

Non devi assolutamente bloccare qui. In primo luogo, aggiornare l'importazione per la roba twitter per:

import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise} 

Sarà necessario il cinguettio Promise quanto questo è l'impl di Future si ritorna dal metodo apply. Quindi, segui ciò che Travis Brown ha detto nella sua risposta in modo che il tuo attore risponda in modo tale da non avere un futuro annidato. Una volta fatto questo, si dovrebbe essere in grado di cambiare il metodo di apply a qualcosa di simile:

def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
    case "https://stackoverflow.com/a/b/c" => 
    val w1 = system.actorOf(Props(new Worker1)) 

    val r = (w1 ? "take work").mapTo[String] 
    val prom = new TwitterPromise[HttpResponse] 
    r.map(toResponse) onComplete{ 
     case Success(resp) => prom.setValue(resp) 
     case Failure(ex) => prom.setException(ex)    
    } 

    prom 
} 

def toResponse(c:String):HttpResponse = { 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

Questo probabilmente ha bisogno di un po 'di lavoro. Non l'ho installato nel mio IDE, quindi non posso garantirvi che venga compilato, ma credo che l'idea sia valida. Quello che restituisci dal metodo apply è un TwitterFuture che non è stato ancora completato. Sarà completato quando il futuro da parte dell'attore chiede (?) È terminato e sta saltando attraverso un callback non bloccante onComplete.

+0

Avevo aggiornato la mia risposta con le implicite e le modifiche, aggiornato e visto la tua risposta che è essenzialmente la stessa funzionalmente solo in linea. Grazie per aver dedicato del tempo. Presumo da entrambe le tue risposte che le domande siano effettivamente ok. È stato per lo più un esercizio di uccisione delle attese sia con il tuo metodo o il metodo proposto da Travis che ritengo siano equivalenti corretti? Grazie ancora. – Eric

+0

Arrotolare la conversione futura con la logica di generazione della risposta come questa non mi sembra ideale, specialmente se la conversione è necessaria anche altrove (che è probabile). C'è un motivo per cui suggeriresti questo approccio sulla mappatura sul futuro e poi sulla conversione? –

+0

@TravisBrown, solo per semplicità dell'esempio, aggiornerò in un secondo mostrando una 'map' prima di convertire prima di eseguire' onComplete'. Questo è quello di cui parlavi giusto? – cmbaxter

Problemi correlati