2013-03-07 11 views
37

Un progetto su cui sto lavorando utilizza più thread per lavorare su una raccolta di file. Ogni thread può aggiungere file all'elenco dei file da elaborare, quindi ho messo insieme (quello che pensavo fosse) una coda thread-safe. parti rilevanti seguono:C++ 11 coda thread-safe

// qMutex is a std::mutex intended to guard the queue 
// populatedNotifier is a std::condition_variable intended to 
//     notify waiting threads of a new item in the queue 

void FileQueue::enqueue(std::string&& filename) 
{ 
    std::lock_guard<std::mutex> lock(qMutex); 
    q.push(std::move(filename)); 

    // Notify anyone waiting for additional files that more have arrived 
    populatedNotifier.notify_one(); 
} 

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) 
{ 
    std::unique_lock<std::mutex> lock(qMutex); 
    if (q.empty()) { 
     if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) { 
      std::string ret = q.front(); 
      q.pop(); 
      return ret; 
     } 
     else { 
      return std::string(); 
     } 
    } 
    else { 
     std::string ret = q.front(); 
     q.pop(); 
     return ret; 
    } 
} 

Tuttavia, sono occasionalmente segfault all'interno del blocco if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } e ispezione gdb indica che i segfaults si verificano perché la coda è vuota. Com'è possibile? Sono stato a conoscenza del fatto che wait_for restituisce solo cv_status::no_timeout quando è stato notificato, e ciò dovrebbe avvenire solo dopo che FileQueue::enqueue ha appena inserito un nuovo elemento nella coda.

+2

Qui si va: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html – GManNickG

+1

domanda, perché stai prendendo 'filename' con ref-ref? Non riesco a vedere alcun motivo per questo qui> –

+1

@TonyTheLion Generalmente in C++ è più efficiente passare oggetti per riferimento piuttosto che fare una copia. In questo caso sto anche usando la semantica del movimento, che consente al compilatore di spostare il contenuto della stringa in coda invece di crearne un'altra copia. –

risposta

23

Secondo lo standard condition_variables, è possibile attivare spuriemente, anche se l'evento non si è verificato. In caso di sveglia spuria restituirà cv_status::no_timeout (poiché si è svegliato invece di scadere), anche se non è stato notificato. La soluzione corretta per questo è, naturalmente, per verificare se il wakeup era effettivamente legittimo prima di procedere.

I dettagli sono specificati nella norma §30.5.1 [thread.condition.condvar]:

-La funzione sbloccherà quando viene segnalato da una chiamata a notify_one(), una chiamata a notify_all() , scadenza del timeout assoluto (30.2.4) specificato da abs_time o spurio.

...

Returns: cv_status :: timeout se il timeout assoluto (30.2.4) speci fi edby abs_time scaduto, altri-ise cv_status :: no_timeout.

+1

E come suggeriresti di farlo? Solo controllando se la coda è di nuovo vuota? –

+4

Addendum: Potrei probabilmente proteggere da wakeups spurie usando un predicato come quello [qui] (http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for). –

+1

Sì. Si chiama variabile _condition_ perché è associata ad alcune condizioni, che è necessario verificare in realtà. Nel tuo caso la condizione da verificare è '! Q.empty()' –

11

Questo è probabilmente il modo si dovrebbe fare:

void push(std::string&& filename) 
{ 
    { 
     std::lock_guard<std::mutex> lock(qMutex); 

     q.push(std::move(filename)); 
    } 

    populatedNotifier.notify_one(); 
} 

bool try_pop(std::string& filename, std::chrono::milliseconds timeout) 
{ 
    std::unique_lock<std::mutex> lock(qMutex); 

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); })) 
     return false; 

    filename = std::move(q.front()); 
    q.pop(); 

    return true;  
} 
+0

Grazie! Ho finito per fare qualcosa di simile. –

+0

Due commenti su quel codice altrimenti buono: 1) prima di notify_one, vorrei sbloccare il mutex per ragioni trovate su http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one 2) il processo di attesa può essere risvegliato spurio, quindi aggiungerei anche una variabile bool che indica che la spinta è effettivamente superiore a – IceFire

+1

1) Ottimizzazione secondaria. 2) Questo sovraccarico di 'wair_for' gestisce le scia spurie attraverso il secondo argomento. – ronag

4

vorrei riscrivere la funzione dequeue come:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) 
{ 
    std::unique_lock<std::mutex> lock(qMutex); 
    while(q.empty()) { 
     if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout) 
      return std::string(); 
    } 
    std::string ret = q.front(); 
    q.pop(); 
    return ret; 
} 

è più breve e non ha il codice duplicato come la vostra ha fatto. Solo il rilascio può attendere più a lungo il timeout. Per evitare che sia necessario ricordare l'ora di inizio prima del ciclo, controllare il timeout e regolare di conseguenza il tempo di attesa. O specificare il tempo assoluto in condizione di attesa.

27

Basta guardarlo, quando si controlla una variabile di condizione è meglio usare un ciclo while (in modo che se si sveglia e non sia ancora valido si ricontrolla). Ho appena scritto un modello per una coda asincrona, spero che questo aiuti.

#ifndef SAFE_QUEUE 
#define SAFE_QUEUE 

#include <queue> 
#include <mutex> 
#include <condition_variable> 

// A threadsafe-queue. 
template <class T> 
class SafeQueue 
{ 
public: 
    SafeQueue(void) 
    : q() 
    , m() 
    , c() 
    {} 

    ~SafeQueue(void) 
    {} 

    // Add an element to the queue. 
    void enqueue(T t) 
    { 
    std::lock_guard<std::mutex> lock(m); 
    q.push(t); 
    c.notify_one(); 
    } 

    // Get the "front"-element. 
    // If the queue is empty, wait till a element is avaiable. 
    T dequeue(void) 
    { 
    std::unique_lock<std::mutex> lock(m); 
    while(q.empty()) 
    { 
     // release lock as long as the wait and reaquire it afterwards. 
     c.wait(lock); 
    } 
    T val = q.front(); 
    q.pop(); 
    return val; 
    } 

private: 
    std::queue<T> q; 
    mutable std::mutex m; 
    std::condition_variable c; 
}; 
#endif 
+1

Grazie! Per fortuna ho già risolto il problema utilizzando i predicati come descritto [qui] (http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for). –

+0

La soluzione più semplice ed elegante del lotto, IMHO. –

+0

Ottima soluzione! Grazie per la condivisione. Una domanda però. Come gestisci la situazione in cui la coda è piena? Dobbiamo considerarlo mentre progettiamo una coda? – Sarah

7

Aggiungendo alla risposta accettata, direi che l'attuazione di una corretta coda a più produttori/multi-consumatori è difficile (più facile visto che il C++ 11, però)

Ti suggerisco di provare il (molto buono) lock free boost library, la struttura "coda" farà quello che vuoi, con le garanzie di assenza di attesa/blocco e without the need for a C++11 compiler.

sto aggiungendo questa risposta oggi perché la libreria senza blocchi è abbastanza nuovo per aumentare (dal 1,53 credo)

+1

Grazie per aver segnalato quella libreria. Al momento, tuttavia, non sembra esserci documentazione per la coda. Qualche idea su dove può essere trovato? –