2010-01-16 9 views
10

Ho la mia applicazione condivisa tra due thread nella versione std::list<Info> infoList. Questi 2 fili accedono questa lista come segue:qual è il modo migliore per sincronizzare l'accesso del contenitore tra più thread nell'applicazione in tempo reale

filettatura 1: utilizza push_back(), pop_front() o clear() nell'elenco (A seconda della situazione) filettatura 2: utilizza un iterator per scorrere le voci del elencare e fare alcune azioni.

Thread 2 si iterazione della lista come la seguente:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i) 
{ 
    DoAction(i); 
} 

Il codice viene compilato utilizzando GCC 4.4.2.

A volte ++ i causa un segfault e causa l'arresto anomalo dell'applicazione. L'errore è causato in linea std_list.h 143 alla seguente riga:

_M_node = _M_node->_M_next; 

immagino questa è una condizione di corsa. L'elenco potrebbe essere stato modificato o eliminato dal thread 1 mentre il thread 2 lo stava iterando.

Ho usato Mutex per sincronizzare l'accesso a questa lista e tutto è andato bene durante il mio test iniziale. Ma il sistema si blocca solo sotto stress test rendendo questa soluzione totalmente inaccettabile. Questa applicazione è un'applicazione in tempo reale e ho bisogno di trovare una soluzione in modo che entrambi i thread possano funzionare il più velocemente possibile senza danneggiare il throughput totale delle applicazioni.

La mia domanda è questa: Thread 1 e Thread 2 devono essere eseguiti il ​​più rapidamente possibile poiché si tratta di un'applicazione in tempo reale. cosa posso fare per prevenire questo problema e mantenere comunque le prestazioni dell'applicazione? Esistono algoritmi privi di lock per questo tipo di problema?

Va bene se mi mancano alcuni oggetti Info appena aggiunti nell'iterazione di thread 2, ma cosa posso fare per evitare che l'iteratore diventi un puntatore pendente?

Grazie

+1

L'articolo di Sutters sulla scrittura di una coda senza blocco può aiutare: http://www.ddj.com/cpp/210604448 –

+0

http://stackoverflow.com/questions/1724630/how-do-i-build- a-lockless-queue –

+0

Perché non si può usare una coda invece di una lista? In che modo viene comunicata al thread del consumatore i nuovi oggetti Info? –

risposta

4

In generale non è sicuro utilizzare i contenitori STL in questo modo. Dovrai implementare un metodo specifico per rendere sicuro il tuo codice. La soluzione che hai scelto dipende dalle tue esigenze. Probabilmente risolverei questo mantenendo due liste, una per ogni thread. E comunicare le modifiche attraverso un lock free queue (menzionato nei commenti a questa domanda). Puoi anche limitare la durata dei tuoi oggetti Info inserendoli in boost :: shared_ptr, ad es.

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList; 

enum CommandValue 
{ 
    Insert, 
    Delete 
} 

struct Command 
{ 
    CommandValue operation; 
    InfoReference reference; 
} 

typedef LockFreeQueue<Command> CommandQueue; 

class Thread1 
{ 
    Thread1(CommandQueue queue) : m_commands(queue) {} 
    void run() 
    { 
     while (!finished) 
     { 
      //Process Items and use 
      // deleteInfo() or addInfo() 
     }; 

    } 

    void deleteInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Delete; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 

    void addInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Insert; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 
} 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 

class Thread2 
{ 
    Thread2(CommandQueue queue) : m_commands(queue) {} 

    void run() 
    { 
     while(!finished) 
     { 
      processQueue(); 
      processList(); 
     } 
    } 

    void processQueue() 
    { 
     Command command; 
     while (m_commands.consume(command)) 
     { 
      switch(command.operation) 
      { 
       case Insert: 
        m_infoList.push_back(command.reference); 
        break; 
       case Delete: 
        m_infoList.remove(command.reference); 
        break; 
      } 
     } 
    } 

    void processList() 
    { 
     // Iterate over m_infoList 
    } 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 


void main() 
{ 
CommandQueue commands; 

Thread1 thread1(commands); 
Thread2 thread2(commands); 

thread1.start(); 
thread2.start(); 

waitforTermination(); 

} 

Questo non è stato compilato. Devi comunque assicurarti che l'accesso agli oggetti Info sia sicuro per i thread.

1

per evitare che i iteratore venga invalidata si deve chiudere l'intero ciclo for. Ora immagino che il primo thread potrebbe avere difficoltà ad aggiornare l'elenco. Vorrei provare a dargli la possibilità di fare il suo lavoro su ciascuna (o ogni ennesima iterazione).

In pseudo-codice che sarebbe simile:

mutex_lock(); 
for(...){ 
    doAction(); 
    mutex_unlock(); 
    thread_yield(); // give first thread a chance 
    mutex_lock(); 
    if(iterator_invalidated_flag) // set by first thread 
    reset_iterator(); 
} 
mutex_unlock(); 
+0

Devo resettare l'iteratore all'inizio della lista ogni volta su reset_iterator(); ? –

+0

@O. Askari: dipende dall'operazione: 'clear' - ovviamente sì,' push_back' - nessuna necessità di reimpostare. Per quanto riguarda 'pop_front' - potrebbe essere necessario reimpostare (ad esempio quando si è all'inizio dell'elenco), nel qual caso è necessario condividere l'iteratore tra i thread se si desidera evitare reimpostazioni non necessarie. – catwalk

-1

Non credo che si può ottenere via senza alcuna sincronizzazione a tutti in questo caso, come determinata operazione saranno invalidare i iteratori che si sta utilizzando. Con una lista, questo è abbastanza limitato (in pratica, se entrambi i thread stanno cercando di manipolare gli iteratori allo stesso elemento nello stesso momento) ma c'è ancora il rischio che tu rimuova un elemento nello stesso momento in cui stai provando per aggiungerne uno.

Sei per caso in possesso del blocco su DoAction(i)? Ovviamente, vuoi solo tenere il lucchetto per il minimo assoluto di tempo che puoi ottenere per massimizzare le prestazioni. Dal codice sopra penso che ti consigliamo di scomporre il ciclo al fine di accelerare entrambi i lati dell'operazione.

Qualcosa sulla falsariga di:

while (processItems) { 
    Info item; 
    lock(mutex); 
    if (!infoList.empty()) { 
    item = infoList.front(); 
    infoList.pop_front(); 
    } 
    unlock(mutex); 
    DoAction(item); 
    delayALittle(); 
} 

E la funzione di inserimento dovrebbe ancora simile a questa:

lock(mutex); 
infoList.push_back(item); 
unlock(mutex); 

A meno che la coda è probabile che sia massiccia, sarei tentato per usare qualcosa come un std::vector<Info> o anche un std::vector<boost::shared_ptr<Info> > per minimizzare la copia degli oggetti Info (supponendo che questi siano un po 'più costosi da copiare rispetto a un boost :: shared_ptr. In generale, le operazioni su un vettore tendono ad essere un po' più veloci che su una lista, specialmente se gli oggetti memorizzati nel vettore sono piccoli ed economici da copiare.

+0

Grazie, questa soluzione sembra promettente. L'elenco non contiene molti elementi, 20 max e ogni elemento Info occupa circa 128 byte. Tuttavia, cosa succede se ho bisogno di mantenere gli articoli elaborati in InfoList e non rimuoverli? –

1

È necessario decidere quale thread è il più importante. Se si tratta del thread di aggiornamento, è necessario segnalare il thread iteratore per interrompere, attendere e ricominciare. Se è il thread iteratore, può semplicemente bloccare l'elenco fino al termine dell'iterazione.

5

Il ciclo for() può potenzialmente mantenere un blocco per un tempo relativamente lungo, a seconda di quanti elementi itera. Puoi metterti nei guai se "esegue il polling" della coda, controllando costantemente se un nuovo elemento fosse disponibile. Ciò rende il thread proprietario del mutex per un tempo irragionevolmente lungo, dando poche opportunità al thread del produttore di entrare e aggiungere un elemento. E bruciando un sacco di inutili cicli di CPU nel processo.

È necessaria una "coda di blocco limitata". Non scrivere da solo, il design della serratura non è banale. Difficile trovare buoni esempi, la maggior parte è codice .NET. This article sembra promettente.

0

È necessario utilizzare una libreria di threading. Se si utilizza Intel TBB, è possibile utilizzare concurrent_vector o concurrent_queue. Vedi this.

3

Mi piacerebbe sapere qual è lo scopo di questa lista, sarebbe più facile rispondere alla domanda allora.

Come ho detto Hoare, in genere è una cattiva idea provare a condividere i dati per comunicare tra due thread, piuttosto si dovrebbe comunicare per condividere i dati: ad esempio la messaggistica.

Se questo elenco sta modellando una coda, ad esempio, è possibile utilizzare semplicemente uno dei vari modi di comunicare (come i socket) tra i due thread. Il consumatore/produttore è un problema standard e ben noto.

Se i vostri articoli sono costosi, quindi passate solo i puntatori durante la comunicazione, eviterete di copiare gli oggetti stessi.

In generale, è squisitamente difficile condividere i dati, anche se purtroppo è l'unico modo di programmazione di cui sentiamo parlare a scuola.Normalmente solo l'implementazione a basso livello dei "canali" di comunicazione dovrebbe mai preoccuparsi della sincronizzazione e dovresti imparare a usare i canali per comunicare invece di provare a emularli.

+0

+1 - solo per la citazione Hoare se non altro. La programmazione multi-thread non è l'unica risposta a tutti i problemi. –

0

Se si desidera continuare a utilizzare std::list in un ambiente a più thread, è consigliabile racchiuderlo in una classe con un mutex che fornisce accesso bloccato ad esso. A seconda dell'esatto utilizzo, potrebbe essere opportuno passare a un modello di coda basato su eventi in cui i messaggi vengono inoltrati su una coda utilizzata da più thread di lavoro (suggerimento: produttore-consumatore).

Prenderò seriamente in considerazione Matthieu's thought. Molti problemi che vengono risolti utilizzando la programmazione multi-thread sono meglio risolti usando il passaggio di messaggi tra thread o processi. Se hai bisogno di un throughput elevato e non richiedi assolutamente che l'elaborazione condivida lo stesso spazio di memoria, considera l'utilizzo di qualcosa come lo Message-Passing Interface (MPI) invece di lanciare la tua soluzione multi-thread. Ci sono un sacco di C++ implementazioni disponibili - OpenMPI, Boost.MPI, Microsoft MPI, ecc ecc

1

Come lei ha ricordato che non si cura se il consumatore iterazione manca alcune voci appena aggiunto, è possibile utilizzare un copy-on-write lista sotto. Ciò consente al consumatore iteratore di operare su un'istantanea coerente dell'elenco a partire dal momento in cui è stato avviato, mentre, in altri thread, gli aggiornamenti dell'elenco producono copie fresche ma coerenti, senza perturbare nessuno degli snapshot esistenti.

Lo scambio qui è che ogni aggiornamento della lista richiede il blocco per l'accesso esclusivo abbastanza a lungo da copiare l'intera lista. Questa tecnica è influenzata dall'avere molti lettori concorrenti e aggiornamenti meno frequenti.

Provare ad aggiungere il blocco intrinseco al contenitore richiede innanzitutto di pensare a quali operazioni devono essere eseguite nei gruppi atomici. Ad esempio, controllando se la lista è vuota prima di provare a far scattare il primo elemento, è necessaria un'operazione atomica pop-if-not-empty ; in caso contrario, la risposta alla lista vuota può cambiare tra quando il chiamante riceve la risposta e tenta di agire su di essa.

Non è chiaro nell'esempio sopra ciò che garantisce l'iterazione deve obbedire. Deve ogni elemento nella lista alla fine essere visitato dal thread di iterazione? Effettua più passaggi? Cosa significa per un thread rimuovere un elemento dall'elenco mentre su un altro thread è in esecuzione DoAction()? Ho il sospetto che il lavoro attraverso queste domande porterà a significativi cambiamenti di progettazione.


Si sta lavorando in C++, e lei ha citato che necessitano di una coda con un un'operazione pop-se-non-vuoto. Ho scritto un numero two-lock queue molti anni fa usando le primitive della concorrenza di ACE Library, dato che lo Boost thread library non era ancora pronto per l'uso in produzione e la possibilità per la libreria standard C++ di includere tali strutture era un sogno lontano. Portarlo a qualcosa di più moderno sarebbe facile.

Questa coda, chiamata concurrent::two_lock_queue, consente di accedere al capo della coda solo tramite RAII. Ciò garantisce che l'acquisizione della serratura per leggere la testa sia sempre accoppiata con un rilascio della serratura. Un consumatore costruisce un const_front (const access to head element), un front (non-const accede all'elemento head), o un oggetto renewable_front (non-const access head to successor elements) per rappresentare il diritto esclusivo di accesso all'elemento head della coda. Tali oggetti "frontali" non possono essere copiati.

Class two_lock_queue offre anche una funzione pop_front() che attende fino a quando almeno un elemento è disponibile per essere rimosso, ma, in linea con std::queue 's e std::stack' lo stile di non mescolare la mutazione del contenitore e la copia del valore, pop_front() rendimenti nulli s.

In un file guidata, c'è un tipo chiamato concurrent::unconditional_pop, che permette di assicurare mediante RAII che l'elemento della coda viene spuntato all'uscita dall'ambito corrente.

Il file compagno error.hh definisce le eccezioni che derivano da un uso della funzione two_lock_queue::interrupt(), utilizzato per sbloccare thread in attesa per l'accesso alla testa della coda.

Dai un'occhiata al codice e fammi sapere se hai bisogno di ulteriori spiegazioni su come usarlo.

+0

Ogni elemento dell'elenco deve essere elaborato da DoAction(), tuttavia un altro thread può eliminare un elemento durante la sua esecuzione. Questo non farà alcun danno, dal momento che può controllare la validità dell'oggetto Info. L'elenco non può essere svuotato dopo il ciclo da quando verrà utilizzato altrove. Ho usato un'istantanea per l'iteratore ma sto ancora avendo problemi su come fare un atomico pop-if-not-empty. Grazie –

1

Il modo migliore per farlo consiste nell'utilizzare un contenitore che è sincronizzato internamente. TBB e Microsoft concurrent_queue fanno questo. Anthony Williams ha anche una buona implementazione sul suo blog here

1

Altri hanno già suggerito alternative senza blocchi, quindi mi rispondere, come se si stesse utilizzando bloccato le serrature ...

Quando si modifica un elenco gli iteratori esistenti possono essere invalidati perché non puntano più a memoria valida (la lista rialloca automagicamente più memoria quando ha bisogno di crescere). Per evitare iteratori invalidate, si potrebbe effettua il blocco produttore su un mutex mentre il vostro consumatore attraversa la lista, ma che sarebbe inutile in attesa per il produttore.

La vita sarebbe più semplice se si usasse una coda anziché una lista e il consumatore utilizzi una chiamata sincronizzata queue<Info>::pop_front() anziché iteratori che può essere invalidata dietro la propria schiena. Se il tuo consumatore ha davvero bisogno di ingurgitare pezzi di Info alla volta, quindi utilizzare un condition variable che farà bloccare il tuo utente fino a queue.size() >= minimum.

La libreria Boost ha una buona implementazione portatile di variabili di condizione (che funziona anche con versioni precedenti di Windows), nonché usual threading library stuff.

Per una coda di produttore-consumatore che utilizza bloccaggio (vecchio stile), controlla la classe BlockingQueue modello della biblioteca ZThreads. Non ho usato ZThreads me stesso, preoccupato per la mancanza di aggiornamenti recenti e perché non sembra essere ampiamente utilizzato. Tuttavia, l'ho usato come fonte d'ispirazione per far girare la mia coda produttore-consumatore thread-safe (prima che venissi a conoscenza delle code lock-free e TBB).

Una libreria di code/stack bloccata sembra essere nella coda di revisione Boost. Speriamo di vedere un nuovo Boost.Lockfree nel prossimo futuro! :)

Se c'è interesse, posso scrivere su un esempio di una coda di blocco che utilizza std :: coda e Boost bloccaggio del filetto.

EDIT:

Il blog a cui fa riferimento la risposta di Rick ha già una coda di esempio di blocco che utilizza std :: coda e Boost condvars.Se il consumatore ha bisogno di inghiottire pezzi, è possibile estendere l'esempio come segue:

void wait_for_data(size_t how_many) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     while(the_queue.size() < how_many) 
     { 
      the_condition_variable.wait(lock); 
     } 
    } 

Si consiglia inoltre di modificarlo per consentire time-out e cancellazioni.

Hai detto che la velocità era una preoccupazione. Se i tuoi valori di Info sono di peso massimo, dovresti prendere in considerazione la possibilità di passarli in giro per shared_ptr. Puoi anche provare a creare le dimensioni fisse di Info e usare un memory pool (che può essere molto più veloce dell'heap).

1

Se stai usando C++ 0x si potrebbe sincronizzare internamente lista iterazione in questo modo:

Supponendo che la classe ha un elenco basato su modelli di nome objects_, e un boost :: mutex denominato mutex_

il metodo è un metodo toall membro della lista involucro

void toAll(std::function<void (T*)> lambda) 
{ 
boost::mutex::scoped_lock(this->mutex_); 
for(auto it = this->objects_.begin(); it != this->objects_.end(); it++) 
{ 
     T* object = it->second; 
     if(object != nullptr) 
     { 
       lambda(object); 
      } 
     } 
} 

Calling:

01.235.
Problemi correlati