2012-02-27 12 views
5

Sto implementando un concurrent_blocking_queue con funzioni minime:Modi eleganti per informare il consumatore quando il produttore ha finito?

//a thin wrapper over std::queue 
template<typename T> 
class concurrent_blocking_queue 
{ 
    std::queue<T> m_internal_queue; 
    //... 
public: 
    void add(T const & item); 
    T& remove(); 
    bool empty(); 
}; 

ho intenzione di usare questo per producer-consumer problem (immagino, è dove si usa strutture di tali dati?). Ma sono bloccato su un problema che è:

Come informare elegantemente il consumatore quando il produttore ha finito? Come fa il produttore a notificare la coda quando viene eseguita? Chiamando una funzione membro specifica, ad esempio done()? È una buona idea lanciare un'eccezione dalla coda (ad esempio dalla funzione remove)?

Mi sono imbattuto in molti esempi, ma tutto ha un ciclo infinito come se il produttore produrrà oggetti per sempre. Nessuno ha discusso il problema delle condizioni di arresto, nemmeno lo wiki article.

+0

Spesso devi usare un valore speciale del tipo 'T', chiamare' QuitMessage' o qualcosa del genere. Ma presumibilmente vuoi costruire questo in coda, indipendentemente dal tipo, piuttosto che lasciare il produttore e il consumatore a definire un protocollo ad un livello più alto? –

+0

@SteveJessop: Sì. Voglio renderlo generico. L'idea del valore speciale non sembra essere buona. Il valore molto speciale potrebbe essere un valore valido per alcuni 'T', o il mio sentimento dice che non è così buono. – Nawaz

+0

Sto proponendo che il produttore selezioni e documenti il ​​valore speciale. Se usano un valore che ha anche un altro significato, allora sono probabilmente stupidi o qualcosa del genere. –

risposta

1

miei code sono di solito usato i puntatori (con una std::auto_ptr nell'interfaccia , per indicare chiaramente che il mittente non può accedere al puntatore ); per la maggior parte, gli oggetti in coda erano polimorfici, quindi l'allocazione dinamica e la semantica di riferimento erano comunque necessari. Altrimenti, non dovrebbe essere troppo difficile aggiungere un flag “ di file alla coda. Avresti bisogno di una funzione speciale sul lato produttore (close?) Per impostarlo (usando esattamente lo stesso blocco delle primitive come quando scrivi in ​​coda), e il ciclo nella funzione di rimozione deve attendere essere lì, o la coda da chiusa. Ovviamente, è necessario restituire un valore Fallible, in modo che il lettore possa sapere se la lettura ha avuto esito positivo oppure no. Inoltre, non chiamare dimenticando che in questo caso è necessario un notify_all per assicurarsi che tutti i processi in attesa sulla condizione vengano risvegliati.

BTW: Non vedo come la tua interfaccia sia implementabile. Cosa fa il T& restituito da remove fare riferimento a. In sostanza, remove deve essere qualcosa di simile:

Fallible<T> 
MessageQueue<T>::receive() 
{ 
    ScopedLock l(myMutex); 
    while (myQueue.empty() && ! myIsDone) 
     myCondition.wait(myMutex); 
    Fallible<T> results; 
    if (!myQueue.empty()) { 
     results.validate(myQueue.top()); 
     myQueue.pop(); 
    } 
    return results; 
} 

Anche senza la condizione myIsDone, si deve leggere il valore in una variabile locale prima di rimuoverlo dalla coda, e non si può restituire un riferimento a una variabile locale.

Per il resto:

void 
MessageQueue<T>::send(T const& newValue) 
{ 
    ScopedLock l(myMutex); 
    myQueue.push(newValue); 
    myCondition.notify_all(); 
} 

void 
MessageQueue<T>::close() 
{ 
    ScopedLock l(myMutex); 
    myIsDone = true; 
    myCondition.notify_all(); 
} 
+0

Nella tua implementazione 'receive()', non è il ciclo 'while' bloccato per sempre, poiché il mutex non sarà mai acquisito in' send' (o 'close') perché è già acquisito da' receive' che non ha Rilasciarlo * prima di andare a dormire? – Nawaz

+0

@Nawaz Chiama 'myCond.wait()', che lo sblocca. In effetti, c'è un errore probabile, o almeno qualcosa di fuorviante, nel codice, dal momento che tutte le funzioni 'wait()' su una condizione che so di prendere un mutex come argomento pure.Il punto è che devi tenere premuto il mutex fino a quando non stai effettivamente aspettando, per evitare qualsiasi altro processo che intervenga tra il momento in cui rilasci il mutex e il momento in cui inizi ad aspettare. 'myCond.wait()' dovrebbe rilasciare atomicamente il mutex e causare l'attesa. Modificherò la risposta per riflettere questo. –

+0

Non ho capito il motivo per cui hai detto che la funzione 'remove' dovrebbe restituire' Fallible 'piuttosto che' T & '? Cos'è "Fallibile"? – Nawaz

5

Ho appena introdotto un prodotto "fatto" fittizio in passato. Quindi se il produttore può creare "prodotti", ad esempio, tipo A e tipo B, ho inventato il tipo "fatto". Quando un consumatore incontra un prodotto di tipo "fatto", sa che l'ulteriore elaborazione non è più richiesta.

+0

+1, sempre un buon approccio. – Tudor

+0

"* un prodotto di ** tipo **" fatto "*"? Cosa significa? Se intendi un valore speciale, allora quel valore speciale potrebbe essere un valore valido per alcuni 'T'. Dì, se 'T = int', quale valore speciale sceglierei per indicare' done'? Si potrebbe dire qualche valore 'N', quindi potrebbe essere che' N' è uno dei valori che il produttore produrrà, comunque? – Nawaz

+0

Si può introdurre un tipo di proxy M che implementa un metodo di try_get bool (T &) e restituisce false nel caso se questo è il 'fine dei dati' di notifica –

2

È vero che è comune accodare un messaggio speciale "abbiamo finito"; tuttavia penso che il desiderio originale di OP per un indicatore fuori banda sia ragionevole. Guarda la complessità che le persone stanno pensando di creare un messaggio di completamento in-band! Tipi di proxy, modelli; buon dolore. Direi che un metodo done() è più semplice e più semplice, e rende il caso comune (non ancora finito) più veloce e più pulito.

Sono d'accordo con kids_fox che un try_remove che restituisce un codice di errore se la coda è finita è preferibile, ma è stilistico e YMMV.

Edit: punti bonus per l'implementazione di una coda che tiene traccia del numero di produttori stanno rimanendo in una situazione più produttori e solleva l'eccezione fatta se e solo se tutti i produttori hanno gettato la spugna ;-) Non andare a fare che con messaggi in-band!

+0

Non sono d'accordo. Per cominciare, come potrebbe una coda tenere traccia di quanti produttori rimangono - la coda non sa quanti produttori ci sono in un sistema. –

+0

@MartinJames: la coda può essere implementata in questo modo in modo da tenere traccia dei produttori? Qualcosa come 'queue.register_producer (this)' e quindi 'queue.unregister_producer (this)'? – Nawaz

+0

@Nawaz - sì, questo è possibile, ma limita la flessibilità e incide l'incapsulamento - significa che il produttore deve avere accesso diretto alla classe della coda, cosa che spesso non è desiderata e/o complessa da organizzare. Cosa succede se un produttore non effettua l'accesso o effettua l'accesso più di una volta? Se mi registro con la coda e in seguito non desidero continuare, devo annullare la registrazione e, in caso affermativo, devo garantire che tutti gli oggetti in sospeso vengano elaborati? –

1

'Arresto' non viene spesso discusso perché spesso non viene mai eseguito. Nei casi in cui è richiesto, spesso è altrettanto semplice e flessibile accodare una pillola avvelenata utilizzando il protocollo P-C di livello superiore, così come lo è creare funzionalità extra nella coda stessa.

Se si vuole veramente fare questo, si può effettivamente impostare un flag che fa sì che ogni consumatore generi un'eccezione, "immediatamente" o ogni volta che ritorna in coda, ma ci sono problemi. Hai bisogno del metodo 'done' per essere sincrono, es. vuoi che tutti i consumatori se ne siano andati entro i ritorni "fatti" o asincroni, ad es. l'ultimo thread del consumatore chiama un parametro evento quando tutti gli altri utenti se ne sono andati?

Come stai andando a organizzare per quei consumatori che sono attualmente in attesa di svegliarsi? Quanti ne stanno aspettando e quanti ne sono occupati, ma torneranno in coda quando avranno fatto il loro lavoro? Cosa succede se uno o più consumatori sono bloccati in una chiamata bloccante, (forse possono essere sbloccati, ma ciò richiede una chiamata da un altro thread - come hai intenzione di farlo)?

Come faranno sapere i consumatori che hanno gestito la loro eccezione e stanno per morire? Sta per morire abbastanza, o è necessario attendere il thread handle? Se è necessario attendere l'handle del thread, che cosa deve fare l'attesa: il thread che richiede l'arresto della coda o l'ultimo thread del consumatore da notificare?

Oh sì, per essere sicuri, è necessario organizzare i thread di produzione che vengono visualizzati con gli oggetti da accodare mentre si è in "chiusura" per generare un'eccezione.

Sollevo queste domande perché ho fatto tutto questo una volta, tanto tempo fa. Alla fine, tutto ha funzionato-ish. Gli oggetti in coda dovevano avere un 'QueuedItem' inserito nella loro catena di ereditarietà (in modo che un metodo di annullamento del lavoro potesse essere esposto alla coda) e la coda doveva tenere un elenco sicuro di oggetti che era stato saltato da thread, ma non ancora elaborato.

Dopo un po ', ho smesso di usare la classe in favore di una semplice coda P-C senza alcun meccanismo speciale di spegnimento.

+0

Il punto sul risveglio dei consumatori che stanno già aspettando è buono; il più semplice sarebbe incrementare il semaforo (presumo) che sta proteggendo la coda. OP riguardava il singolo produttore singolo consumatore, quindi è sufficiente. La maggior parte della complessità che si discute è rispetto alla creazione di done() sincrono, quindi, quindi se ciò non è necessario, la classe non è male da scrivere. – zmccord

+0

In realtà, forse abbiamo interpretato il post originale in modo diverso. Stavo pensando che OP voleva dire ai consumatori che non sarebbero mai arrivati ​​più dati se avessero provato a rimuovere dalla coda dopo che l'ultimo oggetto era stato rimosso. Ho capito che leggi l'OP come un modo per la coda per dire ai consumatori di terminare l'elaborazione prima di elaborare tutti gli oggetti esistenti nella coda? Quest'ultimo sembra più difficile da pulire. – zmccord

+0

@zmccord - il post OP era un po 'poco chiaro. 'single-consumer single-producer' - sì, OK, ma queste code non sono molto utili/flessibili e alquanto pericolose - non importa quanta documentazione si metta, alcuni sviluppatori finiranno per inviare un messaggio da un altro thread. Le code "reali" P-C sono incondizionatamente sicure. Se il 'done' non ha bisogno di essere sincrono e non è richiesta alcuna notifica di completamento, sì, basta spingere un oggetto nullo/nullo/qualsiasi su quello illegale e fare in modo che ogni thread consumatore lo faccia fuori, riconoscerlo, reinserirlo e poi muori. –

Problemi correlati