2014-07-12 13 views
14

Supponiamo che si stia utilizzando un oggetto multiprocessing.Pool e che si stia utilizzando l'impostazione initializer del costruttore per passare una funzione di inizializzazione che quindi crea una risorsa nello spazio dei nomi globale. Si supponga che la risorsa abbia un gestore di contesto. Come gestiresti il ​​ciclo di vita della risorsa gestita dal contesto, a condizione che debba vivere attraverso la vita del processo, ma che venga adeguatamente ripulita alla fine?Gestori del contesto e pool di multielaborazione

Finora, ho qualcosa di un po 'come questo:

resource_cm = None 
resource = None 


def _worker_init(args): 
    global resource 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 

Da qui in poi, i processi di piscina possono utilizzare la risorsa. Fin qui tutto bene. Ma maneggiare la pulizia è un po 'più complicato, dal momento che la classe multiprocessing.Pool non fornisce un argomento destructor o deinitializer.

Una delle mie idee è utilizzare il modulo atexit e registrare la pulizia nell'inizializzatore. Qualcosa del genere:

def _worker_init(args): 
    global resource 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 

    def _clean_up(): 
     resource_cm.__exit__() 

    import atexit 
    atexit.register(_clean_up) 

È un buon approccio? C'è un modo più semplice per farlo?

EDIT: atexit non sembra funzionare. Almeno non nel modo in cui lo sto usando sopra, quindi al momento non ho ancora una soluzione per questo problema.

risposta

21

In primo luogo, questa è davvero una grande domanda! Dopo aver scavato un po 'intorno nel codice multiprocessing, credo di aver trovato un modo per farlo:

quando si avvia un multiprocessing.Pool, internamente l'oggetto Pool crea un oggetto multiprocessing.Process per ogni membro della piscina. Quando quei sotto-processi stanno iniziando up, che chiamano una funzione _bootstrap, che assomiglia a questo:

def _bootstrap(self): 
    from . import util 
    global _current_process 
    try: 
     # ... (stuff we don't care about) 
     util._finalizer_registry.clear() 
     util._run_after_forkers() 
     util.info('child process calling self.run()') 
     try: 
      self.run() 
      exitcode = 0 
     finally: 
      util._exit_function() 
     # ... (more stuff we don't care about) 

Il metodo run è quello che corre in realtà il target ti ha dato l'oggetto Process. Per un processo Pool si tratta di un metodo con un ciclo while a esecuzione prolungata che attende l'arrivo di elementi di lavoro su una coda interna. Quello che è veramente interessante per noi è quello che è successo dopo ilself.run: util._exit_function() viene chiamato.

Come si è visto, che la funzione fa alcuni fino pulita che suona molto come quello che stai cercando:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 
        active_children=active_children, 
        current_process=current_process): 
    # NB: we hold on to references to functions in the arglist due to the 
    # situation described below, where this function is called after this 
    # module's globals are destroyed. 

    global _exiting 

    info('process shutting down') 
    debug('running all "atexit" finalizers with priority >= 0') # Very interesting! 
    _run_finalizers(0) 

Ecco il docstring di _run_finalizers:

def _run_finalizers(minpriority=None): 
    ''' 
    Run all finalizers whose exit priority is not None and at least minpriority 

    Finalizers with highest priority are called first; finalizers with 
    the same priority will be called in reverse order of creation. 
    ''' 

Il metodo in realtà scorre un elenco di callback finalizzatori e li esegue:

items = [x for x in _finalizer_registry.items() if f(x)] 
items.sort(reverse=True) 

for key, finalizer in items: 
    sub_debug('calling %s', finalizer) 
    try: 
     finalizer() 
    except Exception: 
     import traceback 
     traceback.print_exc() 

Perf ect. Quindi, come possiamo entrare nel _finalizer_registry?C'è un oggetto non documentato chiamato Finalize in multiprocessing.util che è responsabile per l'aggiunta di una richiamata al Registro di sistema:

class Finalize(object): 
    ''' 
    Class which supports object finalization using weakrefs 
    ''' 
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 
     assert exitpriority is None or type(exitpriority) is int 

     if obj is not None: 
      self._weakref = weakref.ref(obj, self) 
     else: 
      assert exitpriority is not None 

     self._callback = callback 
     self._args = args 
     self._kwargs = kwargs or {} 
     self._key = (exitpriority, _finalizer_counter.next()) 
     self._pid = os.getpid() 

     _finalizer_registry[self._key] = self # That's what we're looking for! 

Ok, in modo da mettere tutto insieme in un esempio:

import multiprocessing 
from multiprocessing.util import Finalize 

resource_cm = None 
resource = None 

class Resource(object): 
    def __init__(self, args): 
     self.args = args 

    def __enter__(self): 
     print("in __enter__ of %s" % multiprocessing.current_process()) 
     return self 

    def __exit__(self, *args, **kwargs): 
     print("in __exit__ of %s" % multiprocessing.current_process()) 

def open_resource(args): 
    return Resource(args) 

def _worker_init(args): 
    global resource 
    print("calling init") 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 
    # Register a finalizer 
    Finalize(resource, resource.__exit__, exitpriority=16) 

def hi(*args): 
    print("we're in the worker") 

if __name__ == "__main__": 
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",)) 
    pool.map(hi, range(pool._processes)) 
    pool.close() 
    pool.join() 

uscita:

calling init 
in __enter__ of <Process(PoolWorker-1, started daemon)> 
calling init 
calling init 
in __enter__ of <Process(PoolWorker-2, started daemon)> 
in __enter__ of <Process(PoolWorker-3, started daemon)> 
calling init 
in __enter__ of <Process(PoolWorker-4, started daemon)> 
we're in the worker 
we're in the worker 
we're in the worker 
we're in the worker 
in __exit__ of <Process(PoolWorker-1, started daemon)> 
in __exit__ of <Process(PoolWorker-2, started daemon)> 
in __exit__ of <Process(PoolWorker-3, started daemon)> 
in __exit__ of <Process(PoolWorker-4, started daemon)> 

Come si può vedere __exit__ viene chiamato in tutti i nostri lavoratori quando abbiamo join() il pool.

Problemi correlati