2009-07-14 20 views

risposta

14

Non sembra che ci sia ancora un modo ufficiale per gestirlo. O almeno, non si basa su questo:

si potrebbe provare qualcosa di simile a ciò che questo post sta facendo - l'accesso ai filehandles tubi sottostanti:

e quindi selezionare.

+0

+1 Wow, bella scoperta! Il mio Google-fu sembra essere debole ... – cdleary

+1

sfortunatamente, il secondo URL non funziona più –

1

È possibile utilizzare qualcosa come il modello Observer, in cui i destinatari della coda vengono informati delle modifiche di stato.

In questo caso, si potrebbe avere il thread di lavoro designato come un ascoltatore su ogni coda, e ogni volta che riceve un segnale di pronto, si può lavorare sul nuovo elemento, altrimenti dormire.

+0

Bene, il 'get' è distruttivo, quindi non si può davvero fare osservazione sulla coda stessa come descritto da GoF. Il thread di dequeueing dovrebbe essere il "osservato" - speravo in un overhead minore rispetto a due thread aggiuntivi. – cdleary

+0

Inoltre, se volessi un singolo punto di accesso per il processo chiamante (come in 'select') avrei bisogno di una coda thread-safe in cima a quei due thread. – cdleary

2

Sembra che l'utilizzo di thread che inoltrano gli elementi in entrata a una singola coda in attesa su di essi sia una scelta pratica quando si utilizza il multiprocessing in modo indipendente dalla piattaforma.

L'evitamento dei thread richiede la gestione di pipe/FD di basso livello, entrambe specifiche della piattaforma e non facili da gestire in modo coerente con l'API di livello superiore.

Oppure avresti bisogno di Code con la possibilità di impostare i callback che ritengo siano l'interfaccia di livello superiore corretta per cui andare. Cioè si potrebbe scrivere qualcosa di simile:

 
    singlequeue = Queue() 
    incoming_queue1.setcallback(singlequeue.put) 
    incoming_queue2.setcallback(singlequeue.put) 
    ... 
    singlequeue.get() 

Forse il pacchetto di multiprocessing potrebbe crescere questa API, ma è ancora arrivati. Il concetto funziona bene con py.execnet che utilizza il termine "canale" invece di "code", vedi qui http://tinyurl.com/nmtr4w

+0

Questa sarebbe un'interfaccia molto bella! (Anche se chiaramente c'è il vantaggio di mantenere le interfacce stdlib strette, come Jesse menziona nel rapporto bug di @ars.) – cdleary

+0

true ma l'attuale API pubblica Queue non gestisce il tuo caso d'uso che penso sia comune. – hpk42

+0

Se è "comune" - presenta un bug report + patch (con test per l'amore di pete) su bugs.python.org e posso valutarlo per 2.7/3.x – jnoller

21

In realtà è possibile utilizzare multiprocessing.Queue objects in select.select. cioè

que = multiprocessing.Queue() 
(input,[],[]) = select.select([que._reader],[],[]) 

selezionerebbe que solamente se è pronto per essere letto da.

Nessuna documentazione a riguardo. Stavo leggendo il codice sorgente della libreria multiprocessing.queue (su Linux normalmente è come /usr/lib/python2.6/multiprocessing/queue.py) per scoprirlo.

Con Queue.Queue non ho trovato alcun modo intelligente per farlo (e mi piacerebbe molto).

+1

sembra non funzionare sotto Windows. – fluke

+3

Funziona alla grande su Unix, ma su Windows l'implementazione 'select.select' può gestire solo socket, non descrittori di file e quindi fallisce. –

+0

Qual è la differenza principale tra 'Queue.Queue' e' multiprocessing.Queue', e '' multiprocessing.Queue' può essere usato per il multithreading e non solo per il multiprocessing? – CMCDragonkai

1

Non sono sicuro se la selezione su una coda di elaborazione multipla funzioni su Windows. Come selezionare su windows ascolta socket e non handle di file, ho il sospetto che potrebbero esserci dei problemi.

La mia risposta è di creare un thread per ascoltare ogni coda in un blocco, e di mettere tutti i risultati in una singola coda ascoltata dal thread principale, essenzialmente multiplexando le singole code in una singola.

Il mio codice per fare questo è:

""" 
Allow multiple queues to be waited upon. 

queue,value = multiq.select(list_of_queues) 
""" 
import queue 
import threading 

class queue_reader(threading.Thread): 
    def __init__(self,inq,sharedq): 
     threading.Thread.__init__(self) 
     self.inq = inq 
     self.sharedq = sharedq 
    def run(self): 
     while True: 
      data = self.inq.get() 
      print ("thread reads data=",data) 
      result = (self.inq,data) 
      self.sharedq.put(result) 

class multi_queue(queue.Queue): 
    def __init__(self,list_of_queues): 
     queue.Queue.__init__(self) 
     for q in list_of_queues: 
      qr = queue_reader(q,self) 
      qr.start() 

def select(list_of_queues): 
    outq = queue.Queue() 
    for q in list_of_queues: 
     qr = queue_reader(q,outq) 
     qr.start() 
    return outq.get() 

La seguente routine di test mostra come usarlo:

import multiq 
import queue 

q1 = queue.Queue() 
q2 = queue.Queue() 

q3 = multiq.multi_queue([q1,q2]) 

q1.put(1) 
q2.put(2) 
q1.put(3) 
q1.put(4) 

res=0 
while not res==4: 
    while not q3.empty(): 
     res = q3.get()[1] 
     print ("returning result =",res) 

Spero che questo aiuti.

Tony Wallace

1

nuova versione del codice di cui sopra ...

Non so quanto bene il select in una coda multiprocessing funziona su Windows. Come selezionare su windows ascolta socket e non handle di file, ho il sospetto che potrebbero esserci dei problemi.

La mia risposta è di creare un thread per ascoltare ogni coda in un blocco, e di mettere tutti i risultati in una singola coda ascoltata dal thread principale, essenzialmente multiplexando le singole code in una singola.

Il mio codice per fare questo è:

""" 
Allow multiple queues to be waited upon. 

An EndOfQueueMarker marks a queue as 
    "all data sent on this queue". 
When this marker has been accessed on 
all input threads, this marker is returned 
by the multi_queue. 

""" 
import queue 
import threading 

class EndOfQueueMarker: 
    def __str___(self): 
     return "End of data marker" 
    pass 

class queue_reader(threading.Thread): 
    def __init__(self,inq,sharedq): 
     threading.Thread.__init__(self) 
     self.inq = inq 
     self.sharedq = sharedq 
    def run(self): 
     q_run = True 
     while q_run: 
      data = self.inq.get() 
      result = (self.inq,data) 
      self.sharedq.put(result) 
      if data is EndOfQueueMarker: 
       q_run = False 

class multi_queue(queue.Queue): 
    def __init__(self,list_of_queues): 
     queue.Queue.__init__(self) 
     self.qList = list_of_queues 
     self.qrList = [] 
     for q in list_of_queues: 
      qr = queue_reader(q,self) 
      qr.start() 
      self.qrList.append(qr) 
    def get(self,blocking=True,timeout=None): 
     res = [] 
     while len(res)==0: 
      if len(self.qList)==0: 
       res = (self,EndOfQueueMarker) 
      else: 
       res = queue.Queue.get(self,blocking,timeout) 
       if res[1] is EndOfQueueMarker: 
        self.qList.remove(res[0]) 
        res = [] 
     return res 

    def join(self): 
     for qr in self.qrList: 
      qr.join() 

def select(list_of_queues): 
    outq = queue.Queue() 
    for q in list_of_queues: 
     qr = queue_reader(q,outq) 
     qr.start() 
    return outq.get() 

Il codice di seguito è la mia routine di test per mostrare come funziona:

import multiq 
import queue 

q1 = queue.Queue() 
q2 = queue.Queue() 

q3 = multiq.multi_queue([q1,q2]) 

q1.put(1) 
q2.put(2) 
q1.put(3) 
q1.put(4) 
q1.put(multiq.EndOfQueueMarker) 
q2.put(multiq.EndOfQueueMarker) 
res=0 
have_data = True 
while have_data: 
    res = q3.get()[1] 
    print ("returning result =",res) 
    have_data = not(res==multiq.EndOfQueueMarker) 
-2

Non pratico.

Inserire un'intestazione nei messaggi e inviarli a una coda comune. Questo semplifica il codice e sarà più pulito nel complesso.