2015-05-12 7 views
8

Desidero utilizzare l'oggetto multiprocessing.Manager() in modo che sia possibile inviare in modo asincrono le informazioni da un worker al manager per inviare informazioni a un server. Quello che ho sono circa 10 istanze che scrivono i PDF sul disco. Quindi volevo utilizzare l'oggetto manager nel pacchetto multiprocessing per inviare quei dati al mio bucket S3 perché non voglio mantenere la generazione del contenuto locale.Progettazione corretta di un multiprocessing.Manager Oggetto personalizzato

Quindi mi chiedevo se creare un oggetto gestore personalizzato, è questo il modo corretto di farlo? Ogni processo inviato all'oggetto gestore viene messo in coda? oppure se chiamo più caricamenti, il gestore interromperà alcune chiamate?

Di seguito è riportato un codice di esempio di quello che sto pensando di fare:

from multiprocessing.managers import BaseManager 

class UploadClass(object): 
    def upload(self, filePath, params, destUrl): 
     # do stuff 
     return results 

class MyManager(BaseManager): 
    pass 

MyManager.register('uploads', UploadClass) 

if __name__ == '__main__': 
    manager = MyManager() 
    manager.start() 
    upload = manager.uploads() 
    # do this wait for completion or do they perform this async 
    print upload.upload(r"<path>", {...}, "some url") 
    print upload.upload(r"<path>", {...}, "some url") 
+0

Giusto per chiarire: si desidera avere dieci processi diversi (sono queste istanze uniche dello stesso script Python, o solo multiprocessing.Process istanze generate all'interno di uno script?), che scrivono tutti i PDF su disco. Una volta terminata la scrittura, ogni istanza invierà il percorso del file a un singolo 'multiprocessing.Manager', che dovrebbe caricare i file uno alla volta (ovvero senza caricamenti paralleli). È giusto? – dano

+0

Inoltre, ti interessa ottenere un risultato dal processo di caricamento? O vuoi solo sparare il caricamento in background e non pensarci più? – dano

+0

@dano: sarebbe utile recuperare un messaggio di qualche tipo dal processo per garantire il corretto funzionamento del processo. –

risposta

2

Per rispondere direttamente alcune delle vostre domande:

Will ogni processo presentato al gestore di oggetti ottenere in coda?

Il server Manager genera un nuovo thread per gestire ogni richiesta in entrata, quindi tutte le richieste inizieranno a essere gestite immediatamente. Si può vedere questo all'interno di multiprocessing/managers.py:

def serve_forever(self): 
    ''' 
    Run the server forever 
    ''' 
    current_process()._manager_server = self 
    try: 
     try: 
      while 1: 
       try: 
        c = self.listener.accept() 
       except (OSError, IOError): 
        continue 
       t = threading.Thread(target=self.handle_request, args=(c,)) 
       t.daemon = True 
       t.start() 
     except (KeyboardInterrupt, SystemExit): 
      pass 
    finally: 
     self.stop = 999 
     self.listener.close() 

se chiamo upload multipli, sarà il manager cadere alcune delle chiamate?

No, nessuna delle chiamate verrà interrotta.

# do this wait for completion or do they perform this async 
print upload.upload(r"<path>", {...}, "some url") 
print upload.upload(r"<path>", {...}, "some url") 

Entrambe le chiamate verso upload.upload saranno sincrono; non verranno restituiti fino al completamento di UploadClass.upload. Tuttavia, se si dovessero avere più script/thread/processi che chiamano upload.upload contemporaneamente, ogni chiamata univoca si verificherà simultaneamente all'interno del proprio thread nel processo del server Manager.

E il tuo più domanda più importante:

è questo il modo corretto di fare questo?

Direi di no, se ho capito bene la domanda. Se v'è solo un copione, e poi depongono le uova dieci multiprocessing.Process istanze all'interno che uno script per scrivere i PDF, quindi si dovrebbe semplicemente usare un altro multiprocessing.Process per gestire i file caricati:

def upload(self, q): 
    for payload in iter(q.get, None): # Keep getting from the queue until a None is found 
     filePath, params, destUrl = payload 
     # do stuff 

def write_pdf(pdf_file_info, q): 
    # write a pdf to disk here 
    q.put((filepath, params, destUrl)) # Send work to the uploader 
    # Move on with whatever comes next. 

if __name__ == '__main__': 
    pdf_queue = multiprocessing.Queue() 

    # Start uploader 
    upload_proc = multiprocessing.Process(upload, args=(pdf_queue,)) 
    upload_proc.start() 

    # Start pdf writers 
    procs = [] 
    for pdf in pdfs_to_write: 
     p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue)) 
     p.start() 
     p.append(procs) 

    # Wait for pdf writers and uploader to finish. 
    for p in procs: 
     p.join() 
    pdf_queue.put(None) # Sending None breaks the for loop inside upload 
    upload_proc.join() 

Se si sta effettivamente ok con concomitante caricamenti, quindi non è necessario avere un processo separato - basta caricare direttamente dai processi di scrittura in pdf.

È difficile capire dalla tua domanda se questo è esattamente quello che stai facendo, però. Una volta chiarito, aggiusterò questo ultimo pezzo per adattarlo al caso d'uso specifico.

+0

Perché generare processi per mettere i dati in coda invece di mettere direttamente dal processo principale? – sirfz

+0

@Sir_FZ L'OP ha dichiarato di avere più istanze che scrivono PDF: * "Quello che ho sono circa 10 istanze che scrivono PDF su disco" *. Quindi ci sono più lavoratori che finiscono per mettere gli oggetti in coda in parallelo. – dano

+0

Buon punto. Ma quando si utilizza Manager, l'OP ha il vantaggio di elaborare più caricamenti simultaneamente (poiché il Manager forca un thread per ogni richiesta) e poiché coinvolge l'IO, si applica la concorrenza. Mentre sei nella tua soluzione, hai un singolo processo che gestisce gli upload in sequenza. Suggerisco di utilizzare un ThreadPool nel processo di caricamento per la gestione asincrona delle richieste di caricamento inserite nella coda. – sirfz

Problemi correlati