6

Recentemente ho iniziato a studiare 0MQ. All'inizio di oggi, mi sono imbattuto in un blog, Python Multiprocessing with ZeroMQ. Ha parlato di the ventilator pattern nella guida 0MQ di cui ho letto, quindi ho deciso di provarlo.Perché questo script Python 0MQ per calcolo distribuito si blocca su una dimensione di input fissa?

Invece di calcolare semplicemente prodotti di numeri da parte di lavoratori come fa il codice originale, ho deciso di provare a fare in modo che il ventilatore invii grandi matrici ai lavoratori tramite messaggi di 0mq. Quello che segue è il codice che ho utilizzato per i miei esperimenti "".

Come indicato in un commento di seguito, ogni volta che ho tentato di aumentare la variabile string_length in un numero maggiore di 3 MB, il codice si blocca.

sintomo tipico: Diciamo abbiamo impostato la string_length a 4MB (vale a dire 4194304), allora forse il manager risultato ottiene il risultato di un operaio, e quindi il codice si ferma solo. htop mostra che i 2 core non stanno facendo molto. Il monitoraggio del traffico di rete Etherape non mostra traffico sull'interfaccia lo.

Finora, dopo ore guardandomi intorno, non sono stato in grado di capire che cosa sta causando questo, e apprezzerei un suggerimento o due sul perché e qualsiasi risoluzione su questo problema. Grazie!

Sto utilizzando Ubuntu 11.04 64 bit su un notebook Dell con CPU Intel Core, 8 GB RAM, 80 GB Intel X25MG2 SSD, Python 2.7.1+, libzmq1 2.1.10-1chl1 ~ natty1, python-pyzmq 2.1.10- 1chl1 ~ natty1

import time 
import zmq 
from multiprocessing import Process, cpu_count 

np = cpu_count() 
pool_size = np 
number_of_elements = 128 
# Odd, why once the slen is bumped to 3MB or above, the code hangs? 
string_length = 1024 * 1024 * 3 

def create_inputs(nelem, slen, pb=True): 
    ''' 
    Generates an array that contains nelem fix-sized (of slen bytes) 
    random strings and an accompanying array of hexdigests of the 
    former's elements. Both are returned in a tuple. 

    :type nelem: int 
    :param nelem: The desired number of elements in the to be generated 
        array. 
    :type slen: int 
    :param slen: The desired number of bytes of each array element. 
    :type pb: bool 
    :param pb: If True, displays a text progress bar during input array 
       generation. 
    ''' 
    from os import urandom 
    import sys 
    import hashlib 

    if pb: 
     if nelem <= 64: 
      toolbar_width = nelem 
      chunk_size = 1 
     else: 
      toolbar_width = 64 
      chunk_size = nelem // toolbar_width 
     description = '%d random strings of %d bytes. ' % (nelem, slen) 
     s = ''.join(('Generating an array of ', description, '...\n')) 
     sys.stdout.write(s) 
     # create an ASCII progress bar 
     sys.stdout.write("[%s]" % (" " * toolbar_width)) 
     sys.stdout.flush() 
     sys.stdout.write("\b" * (toolbar_width+1)) 
    array = list() 
    hash4a = list() 
    try: 
     for i in range(nelem): 
      e = urandom(int(slen)) 
      array.append(e) 
      h = hashlib.md5() 
      h.update(e) 
      he = h.hexdigest() 
      hash4a.append(he) 
      i += 1 
      if pb and i and i % chunk_size == 0: 
       sys.stdout.write("-") 
       sys.stdout.flush() 
     if pb: 
      sys.stdout.write("\n") 
    except MemoryError: 
     print('Memory Error: discarding existing arrays') 
     array = list() 
     hash4a = list() 
    finally: 
     return array, hash4a 

# The "ventilator" function generates an array of nelem fix-sized (of slen 
# bytes long) random strings, and sends the array down a zeromq "PUSH" 
# connection to be processed by listening workers, in a round robin load 
# balanced fashion. 

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    time.sleep(1) 

# The "worker" functions listen on a zeromq PULL connection for "work" 
# (array to be processed) from the ventilator, get the length of the array 
# and send the results down another zeromq PUSH connection to the results 
# manager. 

def worker(wrk_num): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive work from the ventilator 
    work_receiver = context.socket(zmq.PULL) 
    work_receiver.connect("tcp://127.0.0.1:5557") 

    # Set up a channel to send result of work to the results reporter 
    results_sender = context.socket(zmq.PUSH) 
    results_sender.connect("tcp://127.0.0.1:5558") 

    # Set up a channel to receive control messages over 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Set up a poller to multiplex the work receiver and control receiver channels 
    poller = zmq.Poller() 
    poller.register(work_receiver, zmq.POLLIN) 
    poller.register(control_receiver, zmq.POLLIN) 

    # Loop and accept messages from both channels, acting accordingly 
    while True: 
     socks = dict(poller.poll()) 

     # If the message came from work_receiver channel, get the length 
     # of the array and send the answer to the results reporter 
     if socks.get(work_receiver) == zmq.POLLIN: 
      #work_message = work_receiver.recv_json() 
      work_message = work_receiver.recv_pyobj() 
      length = len(work_message['num'][0]) 
      answer_message = { 'worker' : wrk_num, 'result' : length } 
      results_sender.send_json(answer_message) 

     # If the message came over the control channel, shut down the worker. 
     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Worker %i received FINSHED, quitting!" % wrk_num) 
       break 

# The "results_manager" function receives each result from multiple workers, 
# and prints those results. When all results have been received, it signals 
# the worker processes to shut down. 

def result_manager(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive results 
    results_receiver = context.socket(zmq.PULL) 
    results_receiver.bind("tcp://127.0.0.1:5558") 

    # Set up a channel to send control commands 
    control_sender = context.socket(zmq.PUB) 
    control_sender.bind("tcp://127.0.0.1:5559") 

    for task_nbr in range(np): 
     result_message = results_receiver.recv_json() 
     print "Worker %i answered: %i" % (result_message['worker'], result_message['result']) 

    # Signal to all workers that we are finsihed 
    control_sender.send("FINISHED") 
    time.sleep(5) 

if __name__ == "__main__": 

    # Create a pool of workers to distribute work to 
    for wrk_num in range(pool_size): 
     Process(target=worker, args=(wrk_num,)).start() 

    # Fire up our result manager... 
    result_manager = Process(target=result_manager, args=()) 
    result_manager.start() 

    # Start the ventilator! 
    ventilator = Process(target=ventilator, args=()) 
    ventilator.start() 
+0

ho fatto altri esperimenti: ha abbassato i number_of_elements a 64 e aumentato la string_length a 6. Il codice ha funzionato ancora bene. Oltre a ciò, è apparso lo stesso sintomo. Questo mi ha portato a credere che ci potrebbe essere un limite di dimensioni del messaggio complessivo da qualche parte nel binding pyzmq. L'API 0MQ C ha questa funzione [link] (http://api.zeromq.org/2-1:zmq-msg-init-size) zmq_msg_init_size (3) che non riesco a trovare nella documentazione di pyzmq. Questa potrebbe essere la causa? – user183394

+0

È possibile ottenere un traceback in cui è sospeso? Potrebbe darti un suggerimento. –

+0

Ho provato il tuo codice sul mio Mac portatile con string_length = 1024 * 1024 * 4 e ha funzionato bene, quindi suppongo che debba avere qualcosa a che fare con qualche tipo di contesa della memoria. –

risposta

6

Il problema è che il ventilatore (PUSH) presa si chiude prima che sia fatto l'invio. Si ha un sonno di 1s alla fine della funzione di ventilazione, che non è sufficiente per inviare messaggi 384 MB. Ecco perché hai la soglia che hai, se il sonno fosse più corto allora la soglia sarebbe più bassa.

Detto questo, LINGER è supposto per prevenire questo genere di cose, quindi vorrei farlo presente con zeromq: PUSH non sembra rispettare LINGER.

Una correzione per il vostro esempio particolare (senza aggiungere un lungo sonno indeterminato) sarebbe quella di utilizzare lo stesso segnale FINISH per terminare il ventilatore come lavoratori. In questo modo, garantisci che il tuo respiratore sopravvive per tutto il tempo necessario.

ventilatore Revised:

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Set up a channel to receive control messages 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    # Poll for FINISH message, so we don't shutdown too early 
    poller = zmq.Poller() 
    poller.register(control_receiver, zmq.POLLIN) 

    while True: 
     socks = dict(poller.poll()) 

     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Ventilator received FINSHED, quitting!") 
       break 
      # else: unhandled message 
+0

minrk, molte grazie per la risposta perspicace. Molto utile! Non sospettavo il valore di ZMQ_LINGER impostato da zmq_setsockopt (3), poiché come hai detto, il valore predefinito è -1 (infinito). Ottima cattura! Solleverò sicuramente il problema prima con pyzmq e lo menzionerò anche sulla mailing list zeromq. Ho testato la tua correzione fino a string_length impostato su 1024 * 1024 * 10, ha aumentato la RAM fisica del mio notebook e ho comunque ottenuto il risultato previsto. Grazie ancora! – user183394

+3

Forse non vale la pena portarlo su con 'pyzmq gente', visto che in pratica sono io in questo momento. Ho eseguito il ping di libzmq e ho scritto un caso di test più semplice in C: https://gist.github.com/1643223 – minrk

Problemi correlati