Sto provando a creare una classe che può eseguire un processo separato per andare a fare un lavoro che richiede molto tempo, lanciare una serie di questi da un modulo principale e quindi attendere che tutti finiscano. Voglio avviare i processi una volta e poi continuare a dar loro da mangiare cose da fare piuttosto che creare e distruggere i processi. Ad esempio, forse ho 10 server che eseguono il comando dd, quindi voglio che tutti scp un file, ecc.Come usare la multiprocessing con istanze di classe in Python?
Il mio obiettivo finale è creare una classe per ciascun sistema che tenga traccia delle informazioni per il sistema in che è legato a come indirizzo IP, log, runtime, ecc. Ma quella classe deve essere in grado di avviare un comando di sistema e quindi restituire l'esecuzione al chiamante mentre viene eseguito quel comando di sistema, per seguire il risultato del comando di sistema in un secondo momento .
Il mio tentativo ha esito negativo perché non riesco a inviare un metodo di istanza di una classe sulla pipe al sottoprocesso tramite pickle. Quelli non sono intercettabili. Ho quindi provato a risolverlo in vari modi ma non riesco a capirlo. Come può essere corretto il mio codice per fare questo? A che serve il multiprocessing se non puoi inviare qualcosa di utile?
Esiste una buona documentazione di multiprocessing utilizzata con le istanze di classe? L'unico modo per far funzionare il modulo multiprocessing è su funzioni semplici. Ogni tentativo di usarlo all'interno di un'istanza di classe ha fallito. Forse dovrei passare gli eventi, invece? Non capisco come farlo ancora.
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
Edit: ho provato a spostare la classe ProcessWorker
e la creazione delle code multiprocessing al di fuori della classe Worker
e poi ha tentato di fare la serializzazione manualmente l'istanza dei lavoratori. Anche quello non funziona e ottengo un errore
RuntimeError: Queue objects should only be shared between processes through inheritance
. Ma sto solo passando dei riferimenti di quelle code nell'istanza worker ?? Mi manca qualcosa di fondamentale. Ecco il codice modificato dalla sezione principale:
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
Avete visto ['dispy'] (http://dispy.sourceforge.net/)? Potrebbe salvare un mal di testa o due :) –
Non ho trovato alcun esempio per dispy che utilizzava le classi. Tutto sembra funzionare da __main__ e non è così che intendo usarlo. I miei esempi che usano multiprocessing.Process hanno funzionato bene in __main__ ma falliscono quando provo a usare classi e metodi con stato –
So che questo è in ritardo nel gioco, ma se usi un fork di 'multiprocessing' chiamato' pathos.multiprocessing', tu può sottacere facilmente le istanze di classe. Se devi fare il dink con gli oggetti 'Queue' e qualcos'altro, puoi accedere alle 'code' bifase aumentate importando' dall'elaborazione della coda di importazione'. 'pathos.multiprocessing' usa' dill', che ** fa ** serializza e invia le definizioni di classe insieme alle istanze. –