2013-02-21 9 views
5

Ho un gestore di richieste che aggiorna un'entità, lo salva nell'archivio dati, quindi deve eseguire un lavoro aggiuntivo prima di tornare (come fare l'accodamento di un'attività in background e serializzazione json di alcuni risultati). Voglio parallelizzare questo codice, in modo che il lavoro aggiuntivo venga eseguito mentre l'entità viene salvata.Come impedire a ndb di eseguire il batch di una chiamata put_async() e rilasciare immediatamente l'RPC?

Ecco ciò che il mio codice del gestore si riduce a:

class FooHandler(webapp2.RequestHandler): 
    @ndb.toplevel 
    def post(self): 
     foo = yield Foo.get_by_id_async(some_id) 

     # Do some work with foo 

     # Don't yield, as I want to perform the code that follows 
     # while foo is being saved to the datastore. 
     # I'm in a toplevel, so the handler will not exit as long as 
     # this async request is not finished. 
     foo.put_async() 

     taskqueue.add(...) 
     json_result = generate_result() 
     self.response.headers["Content-Type"] = "application/json; charset=UTF-8" 
     self.response.write(json_result) 

Tuttavia, Appstats dimostra che la RPC datastore.Put è stato fatto in serie, dopo taskqueue.Add:

Appstats screenshot

Un po 'di scavo intorno a ndb.context.py indica che una chiamata put_async() finisce per essere aggiunta a un AutoBatcher anziché all'RPC emesso immediatamente.

Quindi presumo che lo venga scaricato quando lo toplevel attende che tutte le chiamate asincrone siano complete.

Capisco che il caricamento in serie abbia vantaggi reali in alcuni scenari, ma nel mio caso qui voglio davvero che l'RPC put venga inviato immediatamente, così posso eseguire altri lavori mentre l'entità viene salvata.

Se lo faccio yield foo.put_async(), tanto sono la stessa cascata in Appstats, ma con datastore.Put stato fatto prima che il resto:

2nd Appstats screenshot

Questo è da aspettarsi, come yield rende mio gestore attendere la put_async() chiamata da completare prima di eseguire il resto del codice.

Ho anche provato ad aggiungere una chiamata a ndb.get_context().flush() subito dopo foo.put_async(), ma le datastore.Put e taskqueue.BulkAdd chiamate sono ancora non effettuata in parallelo secondo Appstats.

Quindi la mia domanda è: come posso forzare la chiamata a put_async() per ignorare il batcher automatico e rilasciare immediatamente l'RPC?

+0

È in produzione o locale? – Lipis

+0

È in produzione. –

risposta

6

Non c'è modo supportato per farlo. Forse dovrebbe esserci. Puoi provare se funziona?

loop - ndb.eventloop.get_event_loop() 
while loop.run_idle(): 
    pass 

Potrebbe essere necessario guardare il codice sorgente del NDB/eventloop.py per vedere cos'altro si potrebbe provare - in pratica si vuole provare la maggior parte di ciò che run0() fa tranne che in attesa di RPC. In particolare, è possibile che si dovrebbe fare questo:

while loop.current: 
    loop.run0() 
while loop.run_idle(): 
    pass 

(Questo ancora non è supportata, perché ci sono altre condizioni si possono avere per gestire troppo, ma quelli non sembra verificarsi nella vostra esempio.)

+0

Ho funzionato chiamando 'ndb.get_context(). Flush()' seguito dai 2 cicli che hai suggerito subito dopo la mia chiamata a 'foo.put_async()'. Credo che ci dovrebbe essere un modo ufficialmente supportato per farlo, poiché non penso che il mio scenario di utilizzo sia raro (salva un'entità, quindi esegui il resto del lavoro del gestore rimanente mentre l'entità viene salvata). Ho inoltrato una richiesta di funzionalità per questo: http://code.google.com/p/googleappengine/issues/detail?id=8863 –

+0

Penso che sia veramente necessario taskqueue.add_async per ottenere la coda di attività rpc in idle/rpc loop in eventloop. http://code.google.com/p/appengine-ndb-experiment/issues/detail?id=180 – tesdal

+0

Qual è stato esattamente il codice che ha funzionato per te? flush() è un tasklet in modo da doverlo restituire, causando probabilmente un ritardo maggiore di quello che vorresti. Comunque, ha convenuto che questa sarebbe stata una caratteristica utile. Forse otterrà più attenzione se lo archivi nel tracker NDB? –

-2

Prova questa, io non sono sicuro al 100% che vi aiuterà:

foo = yield Foo.get_by_id_async(some_id) 
future = foo.put_async() 
future.done() 

richieste NDB vengono messe in autobatcher, il lotto viene inviato a RPC quando si ha bisogno di un risultato. Dal momento che non hai bisogno del risultato di foo.put_async(), non viene inviato fino a quando non fai una nuova chiamata ndb (non lo fai) o finché non termina il @ ndb.toplevel.

Chiamare future.done() non blocca, ma suppongo che potrebbe attivare la richiesta.

Un'altra cosa per cercare di forzare l'operazione è:

ndb.get_context().flush() 
+0

Grazie, ma non fa nulla. 'Future.done()' fa solo 'return self._done' senza alcuna elaborazione, e ho già provato 'Context.flush()'. –

Problemi correlati