2009-05-27 26 views
11

Sto scrivendo un programma di server con un produttore e molteplici consumatori, quello che mi confonde è solo il primo produttore compito mettere in coda ottiene consumato, dopo la quale le attività accodato non è più ottenere consumato, rimangono nella coda per sempre.produttore problema/consumatore con python multiprocessing

from multiprocessing import Process, Queue, cpu_count 
from http import httpserv 
import time 

def work(queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(5) 
     print "task done:", task 
    queue.put(None) 

class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     self.workers = [Process(target=work, args=(self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     httpserv(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESSES): 
      self.workers[i].join() 
     queue.close() 

Manager().start() 

Il produttore è un server HTTP che ha messo un compito nella coda una volta riceverà una richiesta da parte dell'utente. Sembra che i processi di consumo siano ancora bloccati quando ci sono nuove attività nella coda, il che è strano.

P.S. Altre due domande non relative a quanto sopra, non sono sicuro che se è meglio mettere il server HTTP nel proprio processo diverso dal processo principale , se sì, come posso fare in modo che il processo principale continui a funzionare prima che tutti i processi figli terminino. Seconda domanda, qual è il modo migliore per bloccare il server HTTP con garbo?

Edit: aggiungere codice del produttore, è solo un semplice server pitone WSGI:

import fapws._evwsgi as evwsgi 
from fapws import base 

def httpserv(queue): 
    evwsgi.start("0.0.0.0", 8080) 
    evwsgi.set_base_module(base) 

    def request_1(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_1') 
     return ["request 1!"] 

    def request_2(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_2') 
     return ["request 2!!"] 

    evwsgi.wsgi_cb(("/request_1", request_1)) 
    evwsgi.wsgi_cb(("/request_2", request_2)) 

    evwsgi.run() 

risposta

7

Penso che ci deve essere qualcosa di sbagliato con la parte server web, come questo funziona perfettamente:

from multiprocessing import Process, Queue, cpu_count 
import random 
import time 


def serve(queue): 
    works = ["task_1", "task_2"] 
    while True: 
     time.sleep(0.01) 
     queue.put(random.choice(works)) 


def work(id, queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(0.05) 
     print "%d task:" % id, task 
    queue.put(None) 


class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     print "starting %d workers" % self.NUMBER_OF_PROCESSES 
     self.workers = [Process(target=work, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     serve(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESS): 
      self.workers[i].join() 
     queue.close() 


Manager().start() 
uscita

Esempio:

starting 2 workers 
0 task: task_1 
1 task: task_2 
0 task: task_2 
1 task: task_1 
0 task: task_1 
+0

Impressionante mentre se si potrebbe fornire un esempio produttore + multi-lavoratore. Sarebbe bello. –

4

"Seconda domanda, qual è il modo migliore per fermare il server HTTP con garbo?"

Questo è difficile.

si hanno due scelte per la Comunicazione interprocesso:

  • controlli Out-of-band. Il server ha un altro meccanismo per la comunicazione. Un altro socket, un segnale Unix o qualcos'altro. Il qualcos'altro potrebbe essere un file "stop-now" nella directory locale del server. Sembra strano, ma funziona bene ed è più semplice dell'introduzione di un ciclo di selezione per l'ascolto su più socket o un gestore di segnale per catturare un segnale Unis.

    Il file "stop-now" è facile da implementare. Il ciclo evwsgi.run() controlla semplicemente questo file dopo ogni richiesta. Per fermare il server, si crea il file, si esegue una richiesta /control (che otterrà un errore 500 o qualcosa del genere, non ha molta importanza) e il server dovrebbe fermarsi. Ricordarsi di cancellare il file stop-now, altrimenti il ​​server non si riavvierà.

  • Controlli in banda. Il server ha un altro URL (/stop) che lo interromperà. Superficialmente, questo sembra un incubo per la sicurezza, ma dipende interamente da dove e come verrà usato questo server. Poiché sembra essere un semplice wrapper attorno a una coda di richieste interne, questo URL extra funziona bene.

    Per eseguire questa operazione, è necessario scrivere la propria versione di evwsgi.run() che può essere terminata impostando alcune variabili in modo da uscire dal ciclo.

Modifica

Probabilmente non si vuole risolvere il suo server, dato che non si conosce lo stato del suo thread di lavoro. Devi segnalare il server e quindi devi solo aspettare che finisca le cose normalmente.

Se si vuole uccidere forzatamente il server, allora os.kill() (o multiprocessing.terminate) funzionerà. Tranne, naturalmente, non sai cosa stessero facendo i fili del bambino.

+0

Che ne dici di mettere il server nel proprio processo e utilizzare il metodo multiprocessing.Process.terminate per terminare il processo? Questo sembra più facile. – btw0