2013-01-05 17 views
21

Sto provando a creare una classe che può eseguire un processo separato per andare a fare un lavoro che richiede molto tempo, lanciare una serie di questi da un modulo principale e quindi attendere che tutti finiscano. Voglio avviare i processi una volta e poi continuare a dar loro da mangiare cose da fare piuttosto che creare e distruggere i processi. Ad esempio, forse ho 10 server che eseguono il comando dd, quindi voglio che tutti scp un file, ecc.Come usare la multiprocessing con istanze di classe in Python?

Il mio obiettivo finale è creare una classe per ciascun sistema che tenga traccia delle informazioni per il sistema in che è legato a come indirizzo IP, log, runtime, ecc. Ma quella classe deve essere in grado di avviare un comando di sistema e quindi restituire l'esecuzione al chiamante mentre viene eseguito quel comando di sistema, per seguire il risultato del comando di sistema in un secondo momento .

Il mio tentativo ha esito negativo perché non riesco a inviare un metodo di istanza di una classe sulla pipe al sottoprocesso tramite pickle. Quelli non sono intercettabili. Ho quindi provato a risolverlo in vari modi ma non riesco a capirlo. Come può essere corretto il mio codice per fare questo? A che serve il multiprocessing se non puoi inviare qualcosa di utile?

Esiste una buona documentazione di multiprocessing utilizzata con le istanze di classe? L'unico modo per far funzionare il modulo multiprocessing è su funzioni semplici. Ogni tentativo di usarlo all'interno di un'istanza di classe ha fallito. Forse dovrei passare gli eventi, invece? Non capisco come farlo ancora.

import multiprocessing 
import sys 
import re 

class ProcessWorker(multiprocessing.Process): 
    """ 
    This class runs as a separate process to execute worker's commands in parallel 
    Once launched, it remains running, monitoring the task queue, until "None" is sent 
    """ 

    def __init__(self, task_q, result_q): 
     multiprocessing.Process.__init__(self) 
     self.task_q = task_q 
     self.result_q = result_q 
     return 

    def run(self): 
     """ 
     Overloaded function provided by multiprocessing.Process. Called upon start() signal 
     """ 
     proc_name = self.name 
     print '%s: Launched' % (proc_name) 
     while True: 
      next_task_list = self.task_q.get() 
      if next_task is None: 
       # Poison pill means shutdown 
       print '%s: Exiting' % (proc_name) 
       self.task_q.task_done() 
       break 
      next_task = next_task_list[0] 
      print '%s: %s' % (proc_name, next_task) 
      args = next_task_list[1] 
      kwargs = next_task_list[2] 
      answer = next_task(*args, **kwargs) 
      self.task_q.task_done() 
      self.result_q.put(answer) 
     return 
# End of ProcessWorker class 

class Worker(object): 
    """ 
    Launches a child process to run commands from derived classes in separate processes, 
    which sit and listen for something to do 
    This base class is called by each derived worker 
    """ 
    def __init__(self, config, index=None): 
     self.config = config 
     self.index = index 

     # Launce the ProcessWorker for anything that has an index value 
     if self.index is not None: 
      self.task_q = multiprocessing.JoinableQueue() 
      self.result_q = multiprocessing.Queue() 

      self.process_worker = ProcessWorker(self.task_q, self.result_q) 
      self.process_worker.start() 
      print "Got here" 
      # Process should be running and listening for functions to execute 
     return 

    def enqueue_process(target): # No self, since it is a decorator 
     """ 
     Used to place an command target from this class object into the task_q 
     NOTE: Any function decorated with this must use fetch_results() to get the 
     target task's result value 
     """ 
     def wrapper(self, *args, **kwargs): 
      self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled! 
     return wrapper 

    def fetch_results(self): 
     """ 
     After all processes have been spawned by multiple modules, this command 
     is called on each one to retreive the results of the call. 
     This blocks until the execution of the item in the queue is complete 
     """ 
     self.task_q.join()       # Wait for it to to finish 
     return self.result_q.get()     # Return the result 

    @enqueue_process 
    def run_long_command(self, command): 
     print "I am running number % as process "%number, self.name 

     # In here, I will launch a subprocess to run a long-running system command 
     # p = Popen(command), etc 
     # p.wait(), etc 
     return 

    def close(self): 
     self.task_q.put(None) 
     self.task_q.join() 

if __name__ == '__main__': 
    config = ["some value", "something else"] 
    index = 7 
    workers = [] 
    for i in range(5): 
     worker = Worker(config, index) 
     worker.run_long_command("ls /") 
     workers.append(worker) 
    for worker in workers: 
     worker.fetch_results() 

    # Do more work... (this would actually be done in a distributor in another class) 

    for worker in workers: 
     worker.close() 

Edit: ho provato a spostare la classe ProcessWorker e la creazione delle code multiprocessing al di fuori della classe Worker e poi ha tentato di fare la serializzazione manualmente l'istanza dei lavoratori. Anche quello non funziona e ottengo un errore

RuntimeError: Queue objects should only be shared between processes through inheritance

. Ma sto solo passando dei riferimenti di quelle code nell'istanza worker ?? Mi manca qualcosa di fondamentale. Ecco il codice modificato dalla sezione principale:

if __name__ == '__main__': 
    config = ["some value", "something else"] 
    index = 7 
    workers = [] 
    for i in range(1): 
     task_q = multiprocessing.JoinableQueue() 
     result_q = multiprocessing.Queue() 
     process_worker = ProcessWorker(task_q, result_q) 
     worker = Worker(config, index, process_worker, task_q, result_q) 
     something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues?? 
     process_worker.start() 
     worker.run_long_command("ls /") 
+0

Avete visto ['dispy'] (http://dispy.sourceforge.net/)? Potrebbe salvare un mal di testa o due :) –

+2

Non ho trovato alcun esempio per dispy che utilizzava le classi. Tutto sembra funzionare da __main__ e non è così che intendo usarlo. I miei esempi che usano multiprocessing.Process hanno funzionato bene in __main__ ma falliscono quando provo a usare classi e metodi con stato –

+0

So che questo è in ritardo nel gioco, ma se usi un fork di 'multiprocessing' chiamato' pathos.multiprocessing', tu può sottacere facilmente le istanze di classe. Se devi fare il dink con gli oggetti 'Queue' e qualcos'altro, puoi accedere alle 'code' bifase aumentate importando' dall'elaborazione della coda di importazione'. 'pathos.multiprocessing' usa' dill', che ** fa ** serializza e invia le definizioni di classe insieme alle istanze. –

risposta

8

Invece di tentare di inviare un metodo stesso (che è impraticabile), provare a inviare un nome di un metodo per eseguire .

A condizione che ogni lavoratore esegua lo stesso codice, è una questione di semplice getattr(self, task_name).

mi piacerebbe passare tuple (task_name, task_args), dove sono stati task_args un dict da alimentare direttamente al metodo compito:

next_task_name, next_task_args = self.task_q.get() 
if next_task_name: 
    task = getattr(self, next_task_name) 
    answer = task(**next_task_args) 
    ... 
else: 
    # poison pill, shut down 
    break 
+1

Che non funziona ... Ottengo l'errore "AttributeError: 'ProcessWorker' L'oggetto non ha attributo 'run_long_command'".Non mi aspetto che funzioni poiché ProcessWorker non ha nessuno dei metodi esistenti nella classe Worker. Voglio inviare il metodo sulla pipe (con informazioni di stato) in modo che il processo remoto possa utilizzare tutte le informazioni di stato. Non vedo davvero il punto del modulo multiprocesso se tutto ciò che farà è eseguire la funzione stateless sull'altro lato. –

+2

Mi dispiace, ma devo ripetere. Non puoi _ inviare un metodo sul tubo. Questo è il motivo per cui "pickle" si lamenta per questo. L'invio di codice eseguibile non è impossibile, ma ottiene molto più coinvolto che sta semplicemente deserializzando un oggetto codice. È necessario implementare in anticipo i metodi che si desidera eseguire nella classe Worker. Se hai bisogno di inviare codice non conosciuto in anticipo, la tua scommessa migliore è inviare il sorgente Python come stringa, quindi chiamare 'compile' e' eval' su di esso. Se si desidera inviare un metodo con uno stato, inserire tutto lo stato negli argomenti del metodo o utilizzare un database condiviso. – 9000

+0

WRT che esegue i metodi stateless: si dispone di pipe che possono contenere lo stato. Distribuisci lo stato iniziale a diversi processi, quindi raccogli i risultati. Se si desidera uno stato altamente condiviso (ad es. Geometria per ray tracing), si utilizza un database (in memoria), qualsiasi cosa da memcached a un RDBMS regolare. Utilizzare lo stato globale _mutable_ di solito è un'idea abbastanza brutta. Se necessario, utilizzare un processo di arbitro che legge dalle pipe e risolve i conflitti (ad esempio un database). – 9000

21

Quindi, il problema era che ero supponendo che Python stava facendo una sorta di magia che è in qualche modo diverso dal modo in cui funziona C++/fork(). Ho in qualche modo pensato che Python copiasse solo la classe, non l'intero programma in un processo separato. Ho seriamente sprecato giorni a cercare di farlo funzionare perché tutti i discorsi sulla serializzazione dei pickle mi hanno fatto pensare che in realtà ha inviato tutto sulla pipe. Sapevo che certe cose non potevano essere inviate oltre la pipa, ma pensavo che il mio problema fosse che non stavo confezionando le cose correttamente.

Questo tutto avrebbe potuto essere evitato se i documenti Python mi hanno dato una vista di 10.000 ft di cosa succede quando questo modulo viene utilizzato. Certo, mi dice quali sono i metodi del modulo multiprocesso e mi fornisce alcuni esempi di base, ma quello che voglio sapere è qual è la "Teoria dell'operazione" dietro le quinte! Ecco il tipo di informazioni che avrei potuto usare. Per favore, interpella se la mia risposta è disattivata. Mi aiuterà a imparare.

Quando si avvia un processo utilizzando questo modulo, l'intero programma viene copiato in un altro processo. Ma dal momento che non si tratta del processo "__main__" e il mio codice lo stava verificando, non spara all'infinito un altro processo. Si ferma e si siede là fuori in attesa di qualcosa da fare, come uno zombi. Tutto ciò che è stato inizializzato nel genitore al momento della chiamata multiprocess.Process() è tutto pronto e pronto. Una volta inserito qualcosa nel multiprocesso. Queue o memoria condivisa, pipe, ecc. (Comunque stai comunicando), il processo separato lo riceve e si mette al lavoro. Può attingere a tutti i moduli importati e impostare come se fosse il genitore. Tuttavia, una volta che alcune variabili di stato interne cambiano nel processo principale o separato, tali modifiche sono isolate. Una volta che il processo viene generato, diventa necessario sincronizzarlo, se necessario, tramite una coda, pipe, memoria condivisa, ecc.

Ho buttato fuori il codice e ricominciato, ma ora sto solo mettendo una funzione extra nel ProcessWorker, un metodo "execute" che esegue una riga di comando. Abbastanza semplice. Non devo preoccuparmi di avviare e quindi chiudere una serie di processi in questo modo, il che mi ha causato tutti i tipi di problemi di instabilità e prestazioni in passato in C++. Quando sono passato ai processi di lancio all'inizio e quindi passando i messaggi a quei processi di attesa, le mie prestazioni sono migliorate ed è stato molto stabile.

BTW, ho guardato questo link per ottenere aiuto, che mi ha buttato fuori perché l'esempio mi ha fatto pensare che i metodi sono stati trasportati attraverso le code: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html Il secondo esempio della prima sezione usato "next_task()" che è apparso (per me) per l'esecuzione di un'attività ricevuta tramite la coda.

+1

Come ho notato nel mio commento sulla tua domanda, se vuoi sottaceti un'istanza di classe senza preoccuparti delle dipendenze tanto ... dovresti usare 'dill', che può sia mettere sotto aceto una classe definizione con l'istanza di classe, * o * pickle il codice sorgente e le dipendenze per la maggior parte degli oggetti, comprese le classi definite dall'utente. Il fork di 'multiprocessing' (citato nel commento alla domanda) usa' dill' per la serializzazione ... evitando così la maggior parte dei problemi che stai descrivendo. –

0

REF: https://stackoverflow.com/a/14179779

risposta il 6 gennaio alle 06:03 di David Lynch non è di fatto corretta quando dice di essere stato ingannato da http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html.

Il codice e gli esempi forniti sono corretti e funzionano come pubblicizzato. next_task()è eseguendo un'attività ricevuta tramite la coda: prova a capire che cosa sta facendo il metodo Task.__call__().

Nel mio caso, mi sono incastrato errori di sintassi nella mia implementazione di run(). Sembra che il sottoprocesso non lo segnalerà e fallisce semplicemente in silenzio - lasciando le cose bloccate in strani circuiti! Assicurati di avere una sorta di correttore di sintassi in esecuzione ad es. Flymake/Pyflakes in Emacs.

Debug tramite multiprocessing.log_to_stderr() F mi ha aiutato a restringere il problema.

Problemi correlati