2015-06-11 10 views
10

I thread x boost funzionano allo stesso tempo. Un thread di produzione riempie una coda sincronizzata con attività di calcolo. I thread dei consumatori compaiono attività e li calcola.Spegnere correttamente i thread di spunto

Synchronised Queue Fonte immagine: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/

L'utente può terminare il programma durante questo processo, quindi ho bisogno di spegnere le mie discussioni correttamente. Il mio attuale approccio sembra non funzionare, poiché vengono lanciate eccezioni. È intuito che durante l'arresto del sistema tutti i processi dovrebbero essere uccisi e interrompere il loro compito corrente, indipendentemente da quello che fanno. Potresti mostrarmi, come potresti uccidere i fili?

inizializzazione Discussione:

for (int i = 0; i < numberOfThreads; i++) 
    { 
     std::thread* thread = new std::thread(&MyManager::worker, this); 
     mThreads.push_back(thread); 
    } 

Distruzione Discussione:

void MyManager::shutdown() 
{ 
    for (int i = 0; i < numberOfThreads; i++) 
    { 
     mThreads.at(i)->join(); 
     delete mThreads.at(i); 
    } 
    mThreads.clear(); 
} 

Worker:

void MyManager::worker() 
{ 
    while (true) 
    { 

     int current = waitingList.pop(); 
     Object * p = objects.at(current); 
     p->calculateMesh(); //this task is internally locked by a mutex 

     try 
     { 
      boost::this_thread::interruption_point(); 
     } 
     catch (const boost::thread_interrupted&) 
     { 
      // Thread interruption request received, break the loop 
      std::cout << "- Thread interrupted. Exiting thread." << std::endl; 
      break; 
     } 
    } 
} 

Sincronizzato Coda:

#include <queue> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 

template <typename T> 
class ThreadSafeQueue 
{ 
public: 

    T pop() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     while (queue_.empty()) 
     { 
      cond_.wait(mlock); 
     } 
     auto item = queue_.front(); 
     queue_.pop(); 

     return item; 
    } 

    void push(const T& item) 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     queue_.push(item); 
     mlock.unlock(); 
     cond_.notify_one(); 
    } 


    int sizeIndicator() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     return queue_.size(); 
    } 


private: 

    bool isEmpty() { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     return queue_.empty(); 
    } 

    std::queue<T> queue_; 
    std::mutex mutex_; 
    std::condition_variable cond_; 
}; 

La chiamata errore gettato stack:

... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68 C++ 
... std::_Mutex_base::lock() Line 42 C++ 
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220 C++ 
... ThreadSafeQueue<int>::pop() Line 13 C++ 
... MyManager::worker() Zeile 178 C++ 
+0

Due cose: isEmpty non è bloccato, e le dimensioni() potrebbe avere un'implementazione più semplice: dopo mutex è bloccato si può semplicemente tornare queue_.size() (e mlock destructor rilascia il mutex) – marom

+0

@marom grazie, corretto il mio codice. L'errore è ancora lì. – Anthea

+0

Due cose: isEmpty e le dimensioni potrebbero non essere pubbliche. Qualsiasi cosa riportino può essere non valida quando valutata dal chiamante. A meno che non siano usati in privato, devono essere rimossi. – stefan

risposta

0

Provare a spostare 'provare' alto (come nell'esempio qui sotto). Se il thread è in attesa di dati (all'interno di waitingList.pop()), potrebbe essere in attesa all'interno della variabile di condizione .wait(). Questo è un 'punto di interruzione' e quindi può essere lanciato quando il thread viene interrotto.

void MyManager::worker() 
{ 
    while (true) 
    { 
     try 
     { 
      int current = waitingList.pop(); 
      Object * p = objects.at(current); 
      p->calculateMesh(); //this task is internally locked by a mutex 

      boost::this_thread::interruption_point(); 
     } 
     catch (const boost::thread_interrupted&) 
     { 
      // Thread interruption request received, break the loop 
      std::cout << "- Thread interrupted. Exiting thread." << std::endl; 
      break; 
     } 
    } 
} 
+0

provato, ma l'errore è ancora lì. Sto aggiornando la domanda con l'elenco delle chiamate di debug del crash. – Anthea

+1

Vorrei avvolgere l'intero ciclo in una clausola try-catch. Qui non cambierà nulla, ma il punto esatto delle eccezioni è che viaggiano attraverso tutte le strutture di controllo del flusso come loop, così da non doverlo fare manualmente. –

-2

Penso che questo sia un classico problema di thread di lettura/scrittura che funziona su un buffer comune. Uno dei modi più sicuri per risolvere questo problema è usare mutex e segnali. (Non sono in grado di postare il codice qui. Per favore mandami una email, vi mando il codice).

0

Forse stai prendendo la classe di eccezione sbagliata? Il che significa che non viene scoperto. Non ho familiarità con i thread ma è il mix di std :: threads e boost :: threads che sta causando questo?

Provare a rilevare l'eccezione genitore più bassa.

3

Dalla mia esperienza nel lavorare con i thread sia in Boost che in Java, cercare di chiudere i thread esternamente è sempre disordinato. Non sono mai riuscito a farlo funzionare in modo pulito.

Il meglio che ho ottenuto è di avere un valore booleano disponibile per tutti i thread di consumo impostato su true. Quando lo imposti su false, i thread torneranno semplicemente da soli. Nel tuo caso, questo potrebbe facilmente essere inserito nel ciclo while.

Inoltre, avrete bisogno di una sincronizzazione in modo da poter attendere il ritorno dei thread prima di cancellarli, altrimenti è possibile ottenere un comportamento difficile da definire.

Un esempio da un progetto passato di mine:

creazione Discussione

barrier = new boost::barrier(numOfThreads + 1); 
threads = new detail::updater_thread*[numOfThreads]; 

for (unsigned int t = 0; t < numOfThreads; t++) { 
    //This object is just a wrapper class for the boost thread. 
    threads[t] = new detail::updater_thread(barrier, this); 
} 

distruzione Discussione

for (unsigned int i = 0; i < numOfThreads; i++) { 
    threads[i]->requestStop();//Notify all threads to stop. 
} 

barrier->wait();//The update request will allow the threads to get the message to shutdown. 

for (unsigned int i = 0; i < numOfThreads; i++) { 
    threads[i]->waitForStop();//Wait for all threads to stop. 
    delete threads[i];//Now we are safe to clean up. 
} 

Alcuni metodi che possono essere di interesse da wrapper thread.

//Constructor 
updater_thread::updater_thread(boost::barrier * barrier) 
{ 
    this->barrier = barrier; 
    running = true; 

    thread = boost::thread(&updater_thread::run, this); 
} 

void updater_thread::run() { 
    while (running) { 
     barrier->wait(); 
     if (!running) break; 

     //Do stuff 

     barrier->wait(); 
    } 
} 

void updater_thread::requestStop() { 
    running = false; 
} 

void updater_thread::waitForStop() { 
    thread.join(); 
}