2014-05-12 25 views
9

Sono in esecuzione diverse cat | zgrep comandi su un server remoto e raccogliendo la loro produzione individualmente per ulteriori elaborazioni:Python: eseguire gatto sottoprocesso in parallelo

class MainProcessor(mp.Process): 
    def __init__(self, peaks_array): 
     super(MainProcessor, self).__init__() 
     self.peaks_array = peaks_array 

    def run(self): 
     for peak_arr in self.peaks_array: 
      peak_processor = PeakProcessor(peak_arr) 
      peak_processor.start() 

class PeakProcessor(mp.Process): 
    def __init__(self, peak_arr): 
     super(PeakProcessor, self).__init__() 
     self.peak_arr = peak_arr 

    def run(self): 
     command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" ' 
     log_lines = (subprocess.check_output(command, shell=True)).split('\n') 
     process_data(log_lines) 

Ciò, tuttavia, comporta l'esecuzione sequenziale del sottoprocesso ('SSH ... gatto ... ') comandi. Il secondo picco attende il primo alla fine e così via.

Come posso modificare questo codice in modo che le chiamate di sottoprocesso vengano eseguite in parallelo, pur essendo in grado di raccogliere l'output per ciascuna individualmente?

+0

'--mmap' è inutile quando la lettura da un tubo ... – twalberg

risposta

-1

Un altro approccio (piuttosto che l'altro suggerimento di mettere i processi di shell in background) è quello di utilizzare multithreading.

Il run metodo che avete avrebbe poi fare qualcosa del genere:

thread.start_new_thread (myFuncThatDoesZGrep) 

Per raccogliere risultati, è possibile fare qualcosa di simile:

class MyThread(threading.Thread): 
    def run(self): 
     self.finished = False 
     # Your code to run the command here. 
     blahBlah() 
     # When finished.... 
     self.finished = True 
     self.results = [] 

Eseguire la discussione come indicato sopra nel collegamento su multithr eading. Quando il tuo oggetto thread ha myThread.finished == True, puoi raccogliere i risultati tramite myThread.results.

+0

Con questo approccio, come posso ottenere la output di ciascuno una volta terminati i thread in esecuzione? E sto già utilizzando un processo, perché un thread funzionerebbe ma non un processo? – liarspocker

+0

Un processo funzionerà - l'altra risposta dichiarata suggeriva di eseguire il lavoro multiprocesso nella shell reale, usando &. In questo approccio, hai solo un processo Python, ma genera molti processi shell. Nell'approccio multi-threaded, si hanno più processi python, ma un processo shell per processo python. Per raccogliere i risultati da più thread, devi creare classi sottoclasse Thread. Quindi inserisci i risultati da un thread come dati oggetto in quella classe. – FrobberOfBits

+0

Ma non è quello che sta facendo il codice sopra? Sto iniziando un nuovo processo per ogni picco, quindi eseguo il sottoprocesso e process_data dal suo metodo di esecuzione. – liarspocker

24

Non è necessario né multiprocessingthreading per eseguire sottoprocessi in parallelo ad es .:

#!/usr/bin/env python 
from subprocess import Popen 

# run commands in parallel 
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True) 
      for i in range(5)] 
# collect statuses 
exitcodes = [p.wait() for p in processes] 

corre comanda 5 shell contemporaneamente. Nota: qui non vengono utilizzati né i thread né il modulo multiprocessing. Non è necessario aggiungere la e commerciale & ai comandi della shell: Popen non attende il completamento del comando. Devi chiamare lo .wait() in modo esplicito.

È conveniente, ma non è necessario usare fili per raccogliere uscita da sottoprocessi:

#!/usr/bin/env python 
from multiprocessing.dummy import Pool # thread pool 
from subprocess import Popen, PIPE, STDOUT 

# run commands in parallel 
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True, 
        stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) 
      for i in range(5)] 

# collect output in parallel 
def get_lines(process): 
    return process.communicate()[0].splitlines() 

outputs = Pool(len(processes)).map(get_lines, processes) 

correlati: Python threading multiple bash subprocesses?.

Ecco esempio di codice che viene emesso dal più operazioni contemporaneamente nello stesso thread:

#!/usr/bin/env python3 
import asyncio 
import sys 
from asyncio.subprocess import PIPE, STDOUT 

@asyncio.coroutine 
def get_lines(shell_command): 
    p = yield from asyncio.create_subprocess_shell(shell_command, 
      stdin=PIPE, stdout=PIPE, stderr=STDOUT) 
    return (yield from p.communicate())[0].splitlines() 

if sys.platform.startswith('win'): 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 

# get commands output in parallel 
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"' 
        .format(i=i, e=sys.executable)) for i in range(5)] 
print(loop.run_until_complete(asyncio.gather(*coros))) 
loop.close() 
+0

@ j-f-sebastian Umm ... Sono confuso su quale sia la differenza tra i frammenti di codice n. 2 e n. 3 nella tua risposta. Puoi per favore indicare alcune risorse o spiegare cosa significa "viene pubblicato ... ** nella stessa discussione **" significa? BTW, grazie mille per # 2 :) –

+1

@SaheelGodhane: la soluzione basata su 'multiprocessing.dummy.Pool()' usa * multiple * (diversi/più di uno) thread. La soluzione basata su 'asyncio' usa un * singolo * thread qui. Per capire come fare diverse cose contemporaneamente * nella stessa discussione *, vedi [Concorrenza di Python dal vivo: LIVE!] (Http://www.youtube.com/watch?v=MCs5OvhV9S4) – jfs

+0

Eccellente esempio! Ho provato a implementare frammenti # 1 con la nuova funzionalità subprocess.run() ma sembra che non funzioni perché quella funzione attende sempre il completamento del processo. Ho dovuto tornare a utilizzare Popen invece. – Jared