2012-08-20 4 views
7

Qualcuno conosce un modo pulito per avvicinarsi al comportamento LIFO o persino non vicino a FIFO (ad esempio casuale) da multiprocessing.Queue?Modo pulito per avvicinarsi al comportamento LIFO dal multiprocessing.Queue? (o anche solo * not * near-FIFO)

Domanda alternativa: Qualcuno potrebbe indicarmi il codice per il thread che gestisce la struttura di archiviazione effettiva dietro multiprocessing.Queue? Sembra che sia banale trovare un accesso LIFO, ma mi sono perso nella tana del coniglio cercando di trovarlo.

Note:

  1. credo multiprocessing.Queuedoes not guarantee order. Belle. Ma è vicino-FIFO così vicino-LIFO sarebbe fantastico.
  2. Potrei rimuovere tutti gli elementi correnti dalla coda e invertire l'ordine prima di lavorare con loro, ma preferisco evitare un kludge se possibile.

(edit) Per chiarire: sto facendo una simulazione legata CPU con multiprocessing e quindi non è possibile utilizzare le code specializzati da Queue. Poiché non ho visto alcuna risposta per alcuni giorni, ho aggiunto la domanda alternativa sopra.


Nel caso in cui si tratta di un problema, di seguito è lieve prova che multiprocessing.Queue è vicina-FIFO. Questo dimostra che in un caso semplice (un singolo thread), è perfettamente FIFO sul mio sistema:

import multiprocessing as mp 
import Queue 

q = mp.Queue() 

for i in xrange(1000): 
    q.put(i) 

deltas = [] 
while True: 
    try: 
     value1 = q.get(timeout=0.1) 
     value2 = q.get(timeout=0.1) 
     deltas.append(value2-value1) 
    except Queue.Empty: 
     break 

#positive deltas would indicate the numbers are coming out in increasing order 
min_delta, max_delta = min(deltas), max(deltas) 
avg_delta = sum(deltas)/len(deltas) 

print "min", min_delta 
print "max", max_delta 
print "avg", avg_delta 

stampe: min, max, e media sono esattamente 1 (FIFO perfetta)

+1

test intelligente ... – mgilson

+0

Hai solo bisogno dei dati LIFO dopo che sono state fatte tutte le aggiunte, o vuoi la possibilità di ottenere i dati più recenti mentre vengono aggiunti nuovi valori? Se il primo, penso che invertire il contenuto della coda è più facile. Se si "vivi" l'accesso LIFO, sarà probabilmente necessario scrivere la propria struttura dati utilizzando le primitive della memoria condivisa dal modulo 'multiprocessing'. – Blckknght

+0

@Blckknght Sì, se posso aspettare fino a quando tutto è lì, allora è piuttosto semplice (opzione 2) ma è una simulazione in corso in cui voglio che la coda agisca approssimativamente come una pila. Ho dato una rapida occhiata ai primitivi, sperando di personalizzare il thread di gestione della coda, ma non sono riuscito a farne testa o croce. Indagando che è il mio prossimo passo se non riesco a trovare un modo semplice. Grazie per il commento! – KobeJohn

risposta

2

Ho guardato sulla classe Queue che vive in Lib/multiprocessing/queues.py nella mia installazione di Python (Python 2.7, ma nulla di ovvio è diverso nella versione da Python 3.2 che ho brevemente controllato). Ecco come funziona:

Ci sono due gruppi di oggetti che vengono mantenuti dall'oggetto Coda. Un set è un primativo multiprocesso sicuro condiviso da tutti i processi. Gli altri vengono creati e utilizzati separatamente da ciascun processo.

Il cross-process oggetti sono impostati nel metodo __init__:

  1. Un oggetto Pipe, chi è due estremità sono salvati come self._reader e self._writer.
  2. Un oggetto BoundedSemaphore, che conteggia (e facoltativamente limiti) quanti oggetti sono in coda.
  3. Un oggetto Lock per la lettura del tubo, e su piattaforme non Windows un altro per la scrittura.(Ritengo che questo è perché scrivere ad un tubo è intrinsecamente multiprocesso-safe su Windows.)

Gli oggetti per processo sono impostati nei metodi _after_fork e _start_thread:

  1. Un oggetto collections.deque usato per bufferizzare le scritture sul tubo.
  2. Un oggetto threading.condition utilizzato per segnalare quando il buffer non è vuoto.
  3. Un oggetto threading.Thread che esegue la scrittura effettiva. È creato pigramente, quindi non esisterà fino a quando almeno una scrittura alla coda è stata richiesta in un dato processo.
  4. Vari oggetti Finalize che puliscono le cose quando il processo finisce.

A get dalla coda è piuttosto semplice. Acquisisci il blocco di lettura, decrementa il semaforo e prendi un oggetto dall'estremità letta del tubo.

A put è più complicato. Usa più thread. Il chiamante su put afferra il blocco della condizione, quindi aggiunge il suo oggetto al buffer e segnala la condizione prima di sbloccarla. Aumenta anche il semaforo e avvia il thread writer se non è ancora in esecuzione.

Il thread del writer scorre per sempre (fino all'annullamento) nel metodo . Se il buffer è vuoto, attende la condizione notempty. Quindi prende un oggetto dal buffer, acquisisce il blocco di scrittura (se esiste) e scrive l'elemento sul tubo.


Quindi, dato tutto ciò, è possibile modificarlo per ottenere una coda LIFO? Non sembra facile. I pipe sono intrinsecamente oggetti FIFO e mentre la coda non può garantire il comportamento FIFO nel suo complesso (a causa della natura asincrona delle scritture da più processi) sarà sempre in gran parte FIFO.

Se si dispone di un solo utente, è possibile importare gli oggetti get dalla coda e aggiungerli allo stack locale del processo. Sarebbe più difficile fare uno stack multi-consumer, anche se con una memoria condivisa uno stack di dimensioni limitate non sarebbe troppo difficile. Avresti bisogno di un lock, una coppia di condizioni (per il blocco/segnalazione su stati pieno e vuoto), un valore intero condiviso (per il numero di valori contenuti) e un array condiviso di un tipo appropriato (per i valori stessi).

+0

Quando riuscirò a trovare il tempo per lavorare su questo, seguirò il tuo eccellente tour attraverso il file m.Queue e vedrò cosa riesco a vedere.Speravo che i dati fossero conservati in un oggetto che sarebbe stato facile "popleft" ma vedo che è una pipa come hai detto tu. In ogni caso, immagino che la tua spiegazione qui sia la migliore su internet per come funziona mp.Queue! Grazie mille. – KobeJohn

1

C'è a LIFO queue nel pacchetto Queue (coda in Python 3). Questo non è esposto nei moduli multiprocessing o multiprocessing.queues.

Sostituzione della linea q = mp.Queue() con q = Queue.LifoQueue() e stampe in esecuzione: min, max e media esattamente come -1.

(Anche io penso che si dovrebbe sempre ottenere esattamente ordine FIFO/LIFO quando ottenere elementi da un solo thread.)

+0

Grazie per il suggerimento. Sfortunatamente sto facendo una simulazione legata alla CPU che usa effettivamente il multiprocessing. Per quanto ne so, la coda standard non può gestire i processi. Aggiornerò la mia domanda con alcuni dettagli a riguardo. – KobeJohn

Problemi correlati