2011-11-26 13 views
10

Mi piacerebbe sapere come viene eseguita correttamente la multiprocessing. Supponendo che ho una lista [1,2,3,4,5] generata dalla funzione f1 che è scritta su un Queue (cerchio verde a sinistra). Ora avvio due processi estraendo da quella coda (eseguendo f2 nei processi). Elaborano i dati, ad esempio: raddoppiano il valore e lo scrivono nella seconda coda. Ora, la funzione f3 legge questi dati e li stampa.Elaborazione multiprocessing in una pipeline completata a destra

layout of the data flow

All'interno delle funzioni non v'è una sorta di un ciclo, cercando di leggere dalla coda per sempre. Come posso interrompere questo processo?

Idea 1

f1 non solo invia la lista, ma anche un oggetto o un oggetto None custon, class PipelineTerminator: pass o qualche che è appena propaga fino in fondo. f3 ora aspetta che None venga, quando è lì, si interrompe. Problema: è possibile che uno dei due f2 legge e propaga lo None mentre l'altro sta ancora elaborando un numero. Quindi l'ultimo valore è perso.

Idea 2

f3 è f1. Quindi la funzione f1 genera i dati e le pipe, genera i processi con f2 e alimenta tutti i dati. Dopo lo spawning e l'alimentazione, ascolta la seconda pipe, semplicemente contando ed elaborando gli oggetti ricevuti. Perché sa quanti dati alimentati, può terminare i processi che eseguono f2. Ma se l'obiettivo è impostare una pipeline di elaborazione, i diversi passaggi dovrebbero essere separabili. Quindi f1, f2 e f3 sono diversi elementi di una pipeline e i passaggi costosi sono eseguiti in parallelo.

Idea 3

pipeline idea 3

Ogni parte della pipeline è una funzione, questa funzione genera processi come ama ed è responsabile per la loro gestione. Sa, quanti dati sono arrivati ​​e quanti dati sono stati restituiti (con yield forse). Quindi è sicuro propagare un oggetto None.

setup child processes 

execute thread one and two and wait until both finished 

thread 1: 
    while True: 
     pull from input queue 
     if None: break and set finished_flag 
     else: push to queue1 and increment counter1 

thread 2: 
    while True: 
     pull from queue2 
     increment counter2 
     yield result 
     if counter1 == counter2 and finished_flag: break 

when both threads finished: kill process pool and return. 

(Invece di usare le discussioni, forse si può pensare di una soluzione più intelligente.)

Quindi ...

Ho implementato una soluzione seguente idea 2, l'alimentazione e in attesa di i risultati per arrivare, ma non era davvero una pipeline con funzioni indipendenti collegate. Ha funzionato per il compito che dovevo gestire, ma era difficile da mantenere.

Mi piacerebbe sapere da voi come implementate le pipeline (in un processo semplice con le funzioni del generatore e così via, ma con più processi?) E gestirle di solito.

risposta

1

Cosa sarebbe sbagliato utilizzare l'idea 1, ma con ogni processo di lavoro (f2) inserire un oggetto personalizzato con il suo identificatore una volta terminato?Quindi f3, terminerebbe quel lavoratore, fino a quando non è rimasto alcun processo di lavoro.

Inoltre, nuovo in Python 3.2 è il pacchetto concurrent.futures sulla libreria standard, che dovrebbe fare quello che si sta cercando di nel "modo giusto" (tm) - http://docs.python.org/dev/library/concurrent.futures.html

Forse è possibile trova un backport di concurrent.futures in Python 2.x series.

+0

Ma come dovrebbero i lavoratori di 'f2' * sapere * che è l'ultimo? 'f1' deve sapere quanti lavoratori ci sono e inviare quel numero di oggetti personalizzati. Fatto così, è garantito che ogni lavoratore riceve questa notifica. Questo è chiaramente possibile, ma poi non posso "collegare semplicemente le funzioni", ho bisogno di sapere quanti lavoratori ci sono in ogni fase. Ecco perché mi piace l'idea 3. E grazie per le cose "concorrenti", questo è nuovo per me e ci scaverò dentro. –

+0

Questo è anche il motivo per cui ho controllato "accetta" :) –

+0

Poiché l'oggetto personalizzato "smette di funzionare" viene inviato da "F1", può includere il numero totale di processi di lavoro "f2". Se questi passano l'oggetto "stop working" a "f3", viene a conoscenza del numero totale di lavoratori. Più informazioni potrebbero essere inviate in questo modo - quindi una cosa importante è avere un "livello di controllo" almeno in "f3" (ma probabilmente anche in "f1") che si preoccuperà solo di questo e passerà semplicemente qualsiasi "messaggio" non oggetti in coda per essere effettivamente processati. – jsbueno

1

Per Idea 1, come circa:

import multiprocessing as mp 

sentinel=None 

def f2(inq,outq): 
    while True: 
     val=inq.get() 
     if val is sentinel: 
      break 
     outq.put(val*2) 

def f3(outq): 
    while True: 
     val=outq.get() 
     if val is sentinel: 
      break 
     print(val) 

def f1(): 
    num_workers=2 
    inq=mp.Queue() 
    outq=mp.Queue() 
    for i in range(5): 
     inq.put(i) 
    for i in range(num_workers):   
     inq.put(sentinel) 
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)] 
    printer=mp.Process(target=f3,args=(outq,)) 
    for w in workers: 
     w.start() 
    printer.start() 
    for w in workers: 
     w.join() 
    outq.put(sentinel) 
    printer.join() 

if __name__=='__main__': 
    f1() 

L'unica differenza dalla descrizione di Idea 1 è che f2 scoppia della while-loop quando riceve la sentinella (ponendo così stesso). f1 blocchi fino a quando i lavoratori non hanno finito (utilizzando w.join()) e quindi invia f3 la sentinella (segnalando che si interrompe dal suo while-loop).

+0

Grazie, è simile all'approccio che ho finito per implementare, ma la tua versione è molto leggibile. Quello che non mi piace è il fatto che ogni componente della pipeline deve sapere qualcosa sulla pipeline, come in questo caso: 'printer' deve conoscere il numero di lavoratori nel passaggio precedente e così via. Ecco perché ho pensato di incapsulare questo e di fornire * ogni * passo della pipeline * esattamente un * input e un output e la ramificazione e la fusione avvengono in ogni passaggio. –

+0

Questo è un buon punto. Puoi rendere 'f3' indipendente da' num_workers' ma lasciando che 'f1' invii il sentinel dopo che il' worker' è finito. Ho modificato il post per mostrare cosa intendo. – unutbu

7

Con modulo MPipe, semplicemente fare questo:

from mpipe import OrderedStage, Pipeline 

def f1(value): 
    return value * 2 

def f2(value): 
    print(value) 

s1 = OrderedStage(f1, size=2) 
s2 = OrderedStage(f2) 
p = Pipeline(s1.link(s2)) 

for task in 1, 2, 3, 4, 5, None: 
    p.put(task) 

Le piste sopra 4 processi:

  • due per la prima fase (funzione f1)
  • uno per il secondo stadio (functi su f2)
  • e uno altro per il programma principale che alimenta la pipeline.

MPipe cookbook Il MPipe cookbook offre alcune spiegazioni su come i processi vengono arrestati internamente utilizzando None come ultima operazione.

Per eseguire il codice, installare MPipe:

virtualenv venv 
venv/bin/pip install mpipe 
venv/bin/python prog.py 

uscita:

2 
4 
6 
8 
10 
+0

Sembra buono, almeno l'esempio introduttivo! Bel logo, a proposito. –

0

Il modo più semplice per fare esattamente questo sta usando semafori.

F1

F1 sta popolando il tuo 'coda' con i dati che si desidera elaborare. Termina la fine di questa spinta, metti le parole chiave 'Stop' nella tua coda. n = 2 per il tuo esempio, ma solitamente il numero di lavoratori coinvolti. codice sarà simile:

for n in no_of_processes: 
    tasks.put('Stop') 

F2

F2 sta tirando dalla coda fornita da un get -command. L'elemento viene prelevato dalla coda ed eliminato nella coda.Ora, si può mettere la comparsa in un ciclo while prestando attenzione al segnale di arresto:

for elem in iter(tasks.get, 'STOP'): 
    do something 

F3

Questo è un po 'complicato. È possibile generare un semaforo in F2 che funge da segnale per F3. Ma non sai quando arriva questo segnale e potresti perdere dati. Tuttavia, F3 estrae i dati allo stesso modo di F2 e puoi inserirli in una notifica try... except. queue.get solleva un queue.Empty quando non ci sono elementi nella coda. Così il vostro tirare in F3 sarà simile:

while control: 
    try: 
     results.get() 
    except queue.Empty: 
     control = False 

Con tasks e results essere code. Quindi non hai bisogno di nulla che non sia già incluso in Python.

0

Io uso concurent.futures e tre pool, che sono collegati insieme tramite future.add_done_callback. Quindi attendo che l'intero processo finisca chiamando lo shutdown in ogni piscina.

from concurrent.futures import ProcessPoolExecutor 
import time 
import random 


def worker1(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe12(future): 
    pool2.submit(worker2, future.result()).add_done_callback(pipe23) 


def worker2(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe23(future): 
    pool3.submit(worker3, future.result()).add_done_callback(spout) 


def worker3(arg): 
    time.sleep(random.random()) 
    return arg 


def spout(future): 
    print(future.result()) 


if __name__ == "__main__": 
    __spec__ = None # Fix multiprocessing in Spyder's IPython 
    pool1 = ProcessPoolExecutor(2) 
    pool2 = ProcessPoolExecutor(2) 
    pool3 = ProcessPoolExecutor(2) 
    for i in range(10): 
     pool1.submit(worker1, i).add_done_callback(pipe12) 
    pool1.shutdown() 
    pool2.shutdown() 
    pool3.shutdown() 
Problemi correlati