2012-09-02 11 views
8

Sono un'applicazione di debug che raccoglie informazioni da 2 sensori: una webcam e un microfono.Cosa potrebbe creare un blocco connection.send()? (da conn1, conn2 = multiprocessing.Pipe())

L'architettura generale è abbastanza semplice:

  • il processo principale invia messaggi (start, stop, get_data) tramite tubi ai processi figli (uno per ciascuno).
  • processi figli raccolgono i dati e inviarlo al processo principale

Bambino & processi principali sono in loop infinito per elaborare i comandi (il processo principale da parte dell'utente, il processo figlio dal processo principale).

Funziona globalmente ma ho problemi a fermare i processi figli.

Ho registrato il codice e sembra accadere 2 cose:

  1. il messaggio 'stop' viene inviato, ma non ottiene attraverso il tubo.
  2. Il processo figlio continua a inviare dati e i blocchi conn.send (dati).

Il comportamento è chiaramente collegato allo stato della connessione, poiché i processi figlio che non restituiscono nulla non hanno questo comportamento. Tuttavia, non vedo come eseguire il debug/modificare l'architettura corrente che sembra ragionevole.

Quindi, che causa questo comportamento di blocco e come evitarlo?

Questo è il codice che viene eseguito per ogni iterazione del ciclo infinito nel processo figlio:

def do(self): 
     while self.cnx.poll(): 
      msg = self.cnx.recv() 
      self.queue.append(msg) 
     #== 
     if not self.queue: 
      func_name = 'default_action' 
      self.queue.append([func_name, ]) 
     #== 
     msg    = self.queue.pop() 
     func_name, args = msg[0], msg[1:] 
     #== 
     res = self.target.__getattribute__(func_name)(*args) 
     #== 
     running = func_name != 'stop' 
     #== 
     if res and self.send: 
      assert running 
      self.output_queue.append(res[0]) 
     if self.output_queue and running: 
      self.cnx.send(self.output_queue.popleft()) 
     #== 
     return running 

aggiornamento: sembra che il tubo non può essere scritto contemporaneamente su entrambe le estremità. Funziona se cambiare le ultime righe del codice qui sopra per:

 if self.output_queue and running: 
      if not self.cnx.poll(): 
       self.cnx.send(self.output_queue.popleft()) 

La domanda rimane aperta anche se, come tubi sono documentati come full duplex di default e questo comportamento non è documentato a tutti. Devo aver frainteso qualcosa. Per favore, illuminami!

aggiornamento 2: solo per essere chiari, nessuna connessione viene chiusa durante questa situazione. Per descrivere la sequenza di eventi:

  • il processo principale invia una messsage ("stop") (si svuota la connessione prima di inviare il messaggio)
  • il processo principale immettere un ciclo (infinito) che si ferma quando il processo figlio è terminato.
  • nel frattempo, il processo figlio è bloccato nell'invio e non ottiene mai il messaggio.
+1

Potresti pubblicare il tuo codice? Altrimenti, è difficile risolvere il tuo problema. –

+0

Ho aggiunto il codice del processo figlio. (Il genitore sta semplicemente inviando il messaggio). – LBarret

+1

Su quale sistema operativo si sta eseguendo? 'multiprocessing.Pipe' è implementato in modo piuttosto diverso (e con una semantica diversa, penso) su Windows. – Blckknght

risposta

4

Un full duplex multiprocessing.Pipe è implementato come socketpair(). Chiamare .send può bloccare per tutti i normali motivi quando si parla con una presa.In base alla tua descrizione, penso che sia probabile che il lettore del tuo Pipe abbia smesso di leggere e che i dati si siano accumulati nei buffer del kernel fino al punto in cui i tuoi blocchi .send.

Se si specifica esplicitamente il numero .close dal lato ricevente, si otterrà probabilmente qualche tipo di errore (anche se probabilmente SIGPIPE, non sicuro) quando si tenta di effettuare il .send. Se la tua connessione ricevente stava andando fuori campo questo probabilmente accadrebbe automaticamente. Potresti essere in grado di risolvere il problema semplicemente facendo attenzione a non memorizzare riferimenti (diretti o indiretti) al lato ricevente in modo che venga deallocato quando quel thread scompare.

Trivial demo di blocco .send:

import multiprocessing 

a, b = multiprocessing.Pipe() 

while True: 
    print "send!" 
    a.send("hello world") 

Ora notare che dopo un po 'si chiude la stampa "Invia!"

+0

L'ho considerato ma sia il ricevitore che il mittente sono ancora vivi e nessuna connessione è chiusa. Ho aggiornato il testo principale. – LBarret

Problemi correlati