2012-03-24 19 views
5

Ho una funzione di downloader che scarica più file in parallelo. Io uso multiprocessing.Pool.map_async per scaricare pezzi diversi dello stesso file. Vorrei mostrare una barra di stato del download. Per questo, ho bisogno di conoscere i byte totali che sono già stati scaricati (total_bytes_dl).Condivisione di una variabile tra processi

pool = multiprocessing.Pool(processes) 
    mapObj = pool.map_async(f, args) 

    while not mapObj.ready(): 
     status = r"%.2f MB/%.2f MB" % (total_bytes_dl/1024.0/1024.0, filesize/1024.0/1024.0,) 
     status = status + chr(8)*(len(status)+1) 
     print status, 
     time.sleep(0.5) 

C'è un modo per impostare una variabile che verrà condivisa tra tutti questi processi e il processo principale, in modo che ogni processo può accodare la quantità di byte che ha appena scaricato?

risposta

3

La soluzione era quella di intilize il nuovo processo e passare il valore ctypes comune:

from ctypes import c_int 
import dummy 

shared_bytes_var = multiprocessing.Value(c_int) 

def Func(...): 
    .... 
    pool = multiprocessing.Pool(initializer=_initProcess,initargs=(shared_bytes_var,)) 
    .... 

def _initProcess(x): 
    dummy.shared_bytes_var = x 
1

Certo, è possibile utilizzare i valori condivisi ctypes nella memoria condivisa, se si desidera solo che i byte scaricati debbano essere eseguiti. passare il valore pertinente per ciascun lavoratore e il processo chiamante avrà accesso ad esso.

vedere: http://docs.python.org/library/multiprocessing.html#shared-ctypes-objects

+2

non è possibile mappare l'oggetto condiviso ctypes: 'RuntimeError: gli oggetti sincronizzati devono essere condivisi solo tra processi tramite l'ereditarietà ' – iTayb

0

si potrebbe usare un oggetto Queue multiprocesso che i lavoratori potrebbero utilizzare per inviare i dati di stato su. Il tuo processo principale dovrà leggere le voci di stato dalla coda e aggiornare di conseguenza lo stato.

1

Utilizzare un oggetto Queue allocato in questo modo:

que = multiprocessing.Manager().Queue() 

passare questa variabile per i lavoratori, e loro possibile utilizzare que.put(bytes) a per segnalare periodicamente quanto hanno scaricato dal loro ultimo rapporto. È poi basta controllare la dimensione della coda e tirare in tutti i rapporti in arrivo:

downloaded = 0 
while not mapObj.ready(): 
    for _ in range(q.qsize()): 
     downloaded += q.get() 
    print downloaded, r"bytes downloaded\r", 
    time.sleep(0.5) 

Nota: Anche se il modulo fornisce anche un metodo multiprocessing.Queue(), non è del tutto equivalente a multiprocessing.Manager().Queue(). Vedi this question e la risposta.

Problemi correlati