2015-02-13 10 views
26

Ho creato con successo un RESTful microservice con Python asyncio e aiohttp che ascolta un evento POST per raccogliere eventi in tempo reale da vari alimentatori.Come combinare l'asyncio di Python con i thread?

Quindi crea una struttura in memoria per memorizzare nella cache gli ultimi 24 h di eventi in una struttura di defaultdict/deque nidificata.

Ora mi piacerebbe controllare periodicamente la struttura del disco, preferibilmente usando il pickle.

Poiché la struttura della memoria può essere> 100 MB, vorrei evitare di trattenere l'elaborazione dell'evento in ingresso per il tempo necessario per il checkpoint della struttura.

Preferisco creare una copia snapshot (ad esempio deepcopy) della struttura e quindi prendermi il mio tempo per scriverlo su disco e ripetere su un intervallo di tempo preimpostato.

Sono stato alla ricerca di esempi su come combinare i thread (ed è un thread anche la soluzione migliore per questo?) E asyncio per questo scopo, ma non riuscivo a trovare qualcosa che mi avrebbe aiutato.

Qualsiasi suggerimento per iniziare è molto apprezzato!

+0

Ho usato i suggerimenti di dano e ho creato una configurazione multi-thread molto semplice che controlla l'archivio eventi in memoria ogni 60 secondi su disco. Ecco un link al file repo git che contiene l'intera logica: https://github.com/fxstein/SentientHome/blob/master/engine/event.engine.py – fxstein

risposta

36

È abbastanza semplice delegare un metodo ad un filo o sottoprocessi utilizzando BaseEventLoop.run_in_executor:

import asyncio 
import time 
from concurrent.futures import ProcessPoolExecutor 

def cpu_bound_operation(x): 
    time.sleep(x) # This is some operation that is CPU-bound 

@asyncio.coroutine 
def main(): 
    # Run cpu_bound_operation in the ProcessPoolExecutor 
    # This will make your coroutine block, but won't block 
    # the event loop; other coroutines can run in meantime. 
    yield from loop.run_in_executor(p, cpu_bound_operation, 5) 


loop = asyncio.get_event_loop() 
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes 
loop.run_until_complete(main()) 

quanto riguarda se utilizzare un ProcessPoolExecutor o ThreadPoolExecutor, che è piuttosto difficile da dire; il decapaggio di un oggetto di grandi dimensioni sicuramente mangerà alcuni cicli della CPU, che inizialmente faresti pensare che ProcessPoolExecutor è la strada da percorrere. Tuttavia, il passaggio dell'oggetto 100MB a un valore Process nel pool richiederebbe il picchettaggio dell'istanza nel processo principale, l'invio dei byte al processo figlio tramite IPC, deselezionandolo nel figlio e quindi il pickling dello di nuovo in modo da poterlo scrivere su disco. Detto questo, la mia ipotesi è che l'overhead di pickling/unpickling sarà abbastanza grande da farti star bene usando lo ThreadPoolExecutor, anche se stai andando a fare un buon risultato a causa della GIL.

Detto questo, è molto semplice testare entrambi i modi e scoprirlo con certezza, quindi tanto vale farlo.

+0

Grazie mille! Dopo tutto è stato molto più facile. Sei corretto ho preso il percorso di utilizzare ThreadPoolExecutor e funziona bene. Scrivere i punti di controllo sempre 60 secondi senza ostacolare l'elaborazione degli eventi. – fxstein

0

Ho anche utilizzato run_in_executor, ma ho trovato questa funzione un po 'grossolana nella maggior parte dei casi, dal momento che richiede partial() per parole chiave args e non lo chiamerò mai con qualcosa di diverso da un singolo executor e il ciclo di eventi predefinito. Quindi ho creato un wrapper conveniente con impostazioni predefinite e gestione automatica degli argomenti delle parole chiave.

from time import sleep 
import asyncio as aio 
loop = aio.get_event_loop() 

class Executor: 
    """In most cases, you can just use the 'execute' instance as a 
    function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in 
    the executor, assign result to y. The defaults can be changed, though, 
    with your own instantiation of Executor, i.e. execute = 
    Executor(nthreads=4)""" 
    def __init__(self, loop=loop, nthreads=1): 
     from concurrent.futures import ThreadPoolExecutor 
     self._ex = ThreadPoolExecutor(nthreads) 
     self._loop = loop 
    def __call__(self, f, *args, **kw): 
     from functools import partial 
     return self._loop.run_in_executor(self._ex, partial(f, *args, **kw)) 
execute = Executor() 

... 

def cpu_bound_operation(t, alpha=30): 
    sleep(t) 
    return 20*alpha 

async def main(): 
    y = await execute(cpu_bound_operation, 5, alpha=-2) 

loop.run_until_complete(main()) 
Problemi correlati