Mi piacerebbe sapere come viene eseguita correttamente la multiprocessing. Supponendo che ho una lista [1,2,3,4,5]
generata dalla funzione f1
che è scritta su un Queue
(cerchio verde a sinistra). Ora avvio due processi estraendo da quella coda (eseguendo f2
nei processi). Elaborano i dati, ad esempio: raddoppiano il valore e lo scrivono nella seconda coda. Ora, la funzione f3
legge questi dati e li stampa.Elaborazione multiprocessing in una pipeline completata a destra
All'interno delle funzioni non v'è una sorta di un ciclo, cercando di leggere dalla coda per sempre. Come posso interrompere questo processo?
Idea 1
f1
non solo invia la lista, ma anche un oggetto o un oggetto None
custon, class PipelineTerminator: pass
o qualche che è appena propaga fino in fondo. f3
ora aspetta che None
venga, quando è lì, si interrompe. Problema: è possibile che uno dei due f2
legge e propaga lo None
mentre l'altro sta ancora elaborando un numero. Quindi l'ultimo valore è perso.
Idea 2
f3
è f1
. Quindi la funzione f1
genera i dati e le pipe, genera i processi con f2
e alimenta tutti i dati. Dopo lo spawning e l'alimentazione, ascolta la seconda pipe, semplicemente contando ed elaborando gli oggetti ricevuti. Perché sa quanti dati alimentati, può terminare i processi che eseguono f2
. Ma se l'obiettivo è impostare una pipeline di elaborazione, i diversi passaggi dovrebbero essere separabili. Quindi f1
, f2
e f3
sono diversi elementi di una pipeline e i passaggi costosi sono eseguiti in parallelo.
Idea 3
Ogni parte della pipeline è una funzione, questa funzione genera processi come ama ed è responsabile per la loro gestione. Sa, quanti dati sono arrivati e quanti dati sono stati restituiti (con yield
forse). Quindi è sicuro propagare un oggetto None
.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(Invece di usare le discussioni, forse si può pensare di una soluzione più intelligente.)
Quindi ...
Ho implementato una soluzione seguente idea 2, l'alimentazione e in attesa di i risultati per arrivare, ma non era davvero una pipeline con funzioni indipendenti collegate. Ha funzionato per il compito che dovevo gestire, ma era difficile da mantenere.
Mi piacerebbe sapere da voi come implementate le pipeline (in un processo semplice con le funzioni del generatore e così via, ma con più processi?) E gestirle di solito.
Ma come dovrebbero i lavoratori di 'f2' * sapere * che è l'ultimo? 'f1' deve sapere quanti lavoratori ci sono e inviare quel numero di oggetti personalizzati. Fatto così, è garantito che ogni lavoratore riceve questa notifica. Questo è chiaramente possibile, ma poi non posso "collegare semplicemente le funzioni", ho bisogno di sapere quanti lavoratori ci sono in ogni fase. Ecco perché mi piace l'idea 3. E grazie per le cose "concorrenti", questo è nuovo per me e ci scaverò dentro. –
Questo è anche il motivo per cui ho controllato "accetta" :) –
Poiché l'oggetto personalizzato "smette di funzionare" viene inviato da "F1", può includere il numero totale di processi di lavoro "f2". Se questi passano l'oggetto "stop working" a "f3", viene a conoscenza del numero totale di lavoratori. Più informazioni potrebbero essere inviate in questo modo - quindi una cosa importante è avere un "livello di controllo" almeno in "f3" (ma probabilmente anche in "f1") che si preoccuperà solo di questo e passerà semplicemente qualsiasi "messaggio" non oggetti in coda per essere effettivamente processati. – jsbueno