2015-07-27 13 views
8

Ho una domanda comprendere la coda nel modulo multiprocessing in Python 3coda Python 3 Multiprocessing situazione di stallo quando si chiama unirsi prima che la coda è vuota

Questo è quello che dicono nel programming guidelines:

Orso tenere presente che un processo che ha inserito gli elementi in una coda attende prima che termini fino a quando tutti gli elementi memorizzati nel buffer non vengono alimentati dal feeder ” feed nella pipe sottostante. (Il processo figlio può chiamare il metodo di Queue.cancel_join_thread della coda per evitare questo comportamento.)

Questo significa che ogni volta che si utilizza una coda è necessario fare in modo che tutti gli elementi che sono stati messi in coda sarà eventualmente rimosso prima che il processo venga unito. Altrimenti non si può essere certi che i processi che hanno gli elementi messi in coda terminino. Ricorda inoltre che i processi non demoniaci di verranno uniti automaticamente.

Un esempio che deadlock è la seguente:

from multiprocessing import Process, Queue 

def f(q): 
    q.put('X' * 1000000) 

if __name__ == '__main__': 
    queue = Queue() 
    p = Process(target=f, args=(queue,)) 
    p.start() 
    p.join()     # this deadlocks 
    obj = queue.get() 

Una soluzione sarebbe di scambiare le ultime due righe (o semplicemente rimuovere la linea p.join()).

Quindi apparentemente, queue.get() non deve essere chiamato dopo un join().

Tuttavia ci sono esempi di utilizzo code in cui get è chiamato dopo un join come:

import multiprocessing as mp 
import random 
import string 

# define a example function 
def rand_string(length, output): 
    """ Generates a random string of numbers, lower- and uppercase chars. """ 
    rand_str = ''.join(random.choice(
       string.ascii_lowercase 
       + string.ascii_uppercase 
       + string.digits) 
    for i in range(length)) 
     output.put(rand_str) 

if __name__ == "__main__": 
    # Define an output queue 
    output = mp.Queue() 

    # Setup a list of processes that we want to run 
    processes = [mp.Process(target=rand_string, args=(5, output)) 
        for x in range(2)] 

    # Run processes 
    for p in processes: 
     p.start() 

    # Exit the completed processes 
    for p in processes: 
     p.join() 

    # Get process results from the output queue 
    results = [output.get() for p in processes] 

    print(results) 

ho eseguire questo programma e funziona (pubblicato anche come una soluzione per la questione StackOverflow Python 3 - Multiprocessing - Queue.get() does not respond).

Qualcuno potrebbe aiutarmi a capire qual è la regola per la situazione di stallo?

risposta

15

L'implementazione della coda in multiprocessing che consente ai dati di essere trasferiti tra processi si basa su pipe OS standard.

tubi del sistema operativo non sono infinitamente lungo, in modo che il processo che mette in coda i dati potrebbero essere bloccati nel sistema operativo durante l'operazione put() fino a qualche altro processo utilizza get() per recuperare i dati dalla coda.

Per piccole quantità di dati, come quella dell'esempio, il processo principale può eseguire join() tutti i sottoprocessi generati e quindi raccogliere i dati. Questo spesso funziona bene, ma non scala, e non è chiaro quando si romperà.

Ma si romperà sicuramente con grandi quantità di dati. Il sottoprocesso verrà bloccato in put() in attesa che il processo principale rimuova alcuni dati dalla coda con get(), ma il processo principale è bloccato in join() in attesa che il sottoprocesso termini. Ciò si traduce in un deadlock.

Ecco un esempio in cui un utente aveva this exact issue. Ho postato del codice in una risposta che lo ha aiutato a risolvere il suo problema.

0

Non chiamare join() su un oggetto processo prima di aver ricevuto tutti i messaggi dalla coda condivisa.

Ho usato seguente soluzione per consentire processi per uscire prima di elaborare tutti i suoi risultati:

results = [] 
while True: 
    try: 
     result = resultQueue.get(False, 0.01) 
     results.append(result) 
    except queue.Empty: 
     pass 
    allExited = True 
    for t in processes: 
     if t.exitcode is None: 
      allExited = False 
      break 
    if allExited & resultQueue.empty(): 
     break 

Può essere ridotto ma lasciato più tempo per essere più chiari per i neofiti.

Qui resultQueue è lo multiprocess.Queue che è stato condiviso con gli oggetti multiprocess.Process. Dopo questo blocco di codice si otterrà l'array result con tutti i messaggi dalla coda.

Il problema è che il buffer di input della pipe della coda che riceve i messaggi potrebbe diventare pieno causando un blocco infinito di writer fino a quando non ci sarà spazio sufficiente per ricevere il messaggio successivo. In modo da avere tre modi per evitare il blocco:

  • aumentare le dimensioni multiprocessing.connection.BUFFER (non così buono)
  • Diminuzione dimensione del messaggio o la sua quantità (non così buono)
  • recuperare immediatamente come vengono i messaggi dalla coda (buon modo)
Problemi correlati