2014-10-22 11 views
7

Ho bisogno di parallelizzare alcune attività in un programma C++ e sono completamente nuovo alla programmazione parallela. Ho fatto alcuni progressi attraverso le ricerche su internet finora, ma sono un po 'bloccato ora. Mi piacerebbe riutilizzare alcuni thread in un ciclo, ma chiaramente non so come fare quello che sto cercando.Riutilizzo del thread in loop C++

Sto acquisendo i dati da due schede ADC sul computer (acquistate in parallelo), quindi ho bisogno di eseguire alcune operazioni sui dati raccolti (elaborati in parallelo) durante la raccolta della successiva serie di dati. Ecco alcuni pseudocodice per illustrare

//Acquire some data, wait for all the data to be acquired before proceeding 
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a); 
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a); 
acq1.join(); 
acq2.join(); 

while(user doesn't interrupt) 
{ 

//Process first batch of data while acquiring new data 
std::thread proc1(ProcessData,memoryAddress1a); 
std::thread proc2(ProcessData,memoryAddress2a); 
acq1(AcquireData, boardHandle1, memoryAddress1b); 
acq2(AcquireData, boardHandle2, memoryAddress2b); 
acq1.join(); 
acq2.join(); 
proc1.join(); 
proc2.join(); 
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/ 
} 

Questo è l'essenza principale di esso. La prossima esecuzione del ciclo scriverà agli indirizzi di memoria "a" durante l'elaborazione dei dati "b" e continuerà ad alternarsi (posso ottenere il codice per farlo, l'ho appena tolto per evitare di ingombrare il problema).

In ogni caso, il problema (come sono sicuro che alcune persone possono già dire) è che la seconda volta che cerco di usare acq1 e acq2, il compilatore (VS2012) dice "IntelliSense: chiamata di un oggetto di un tipo di classe senza operatore appropriato() o funzioni di conversione a tipo puntatore-a-funzione ". Allo stesso modo, se metto di nuovo std :: thread davanti a acq1 e acq2, si dice "errore C2374: 'acq1': ridefinizione, inizializzazione multipla".

Quindi la domanda è: posso riassegnare i thread a una nuova attività quando hanno completato l'attività precedente? Aspetto sempre che l'uso precedente del thread termini prima di chiamarlo di nuovo, ma non so come riassegnare il thread, e dato che è in un ciclo, non posso creare un nuovo thread ogni volta (o se potrebbe, che sembra uno spreco e inutile, ma potrei sbagliarmi).

Grazie in anticipo

risposta

0

Questo

std::thread acq1(...) 

è la chiamata di un costruttore. costruzione di un nuovo oggetto chiamato acq1

Questo

acq1(...) 

è l'applicazione dell'operatore() sulla aqc1 oggetto esistente. Se non esiste un operatore definito per std :: thread, il compilatore si lamenta.

Per quanto ne so, non è possibile riutilizzare std :: threads. Costruisci e avvialo. Unisciti a loro e buttale via,

+0

Grazie a Oncaphillis, tra la tua risposta e quella di luk32, penso di rendermi conto di quale fosse il mio errore di sintassi. – notaCSmajor

0

Beh, dipende se si considera lo spostamento di una riassegnazione o meno. Puoi spostare una discussione ma non farne una copia.

Il codice seguente creerà una nuova coppia di thread ogni iterazione e li sposterà al posto di thread precedenti. Immagino che questo dovrebbe funzionare, perché i nuovi oggetti thread saranno temporanei.

while(user doesn't interrupt) 
{ 
//Process first batch of data while acquiring new data 
std::thread proc1(ProcessData,memoryAddress1a); 
std::thread proc2(ProcessData,memoryAddress2a); 
acq1 = std::thread(AcquireData, boardHandle1, memoryAddress1b); 
acq2 = std::thread(AcquireData, boardHandle2, memoryAddress2b); 
acq1.join(); 
acq2.join(); 
proc1.join(); 
proc2.join(); 
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/ 
} 

Cosa sta succedendo a dire, l'oggetto in realtà non finisce è vita alla fine del ciclo, perché è stata dichiarata nel perimetro esterno per quanto riguarda il ciclo. Ma un nuovo oggetto viene creato ogni volta e viene eseguito move. Non vedo ciò che può essere risparmiato (potrei essere stupido), quindi immagino che sia esattamente lo stesso di dichiarare acq s all'interno del ciclo e semplicemente riusare il simbolo. Tutto sommato ... sì, si tratta di come classificare una creazione temporanea e move.

Inoltre, questo avvia chiaramente un nuovo thread per ogni ciclo (ovviamente terminando il thread precedentemente assegnato), non fa in modo che un thread attenda nuovi dati e lo invii magicamente alla pipe di elaborazione. Dovresti implementarlo in modo diverso. E.g: thread dei thread di lavoro e comunicazione sulle code.

Riferimenti: operator=, (ctor).

Penso che gli errori che ottieni siano auto-esplicativi, quindi salterò la spiegazione.

+0

Grazie, ho provato questo e sembra funzionare come necessario. Il tempo dirà se ci sono problemi con esso, ma per ora sta aiutando considerevolmente! – notaCSmajor

9

La classe std::thread è progettata per eseguire esattamente un compito (quello che gli viene dato nel costruttore) e quindi terminare. Se vuoi fare più lavoro, avrai bisogno di una nuova discussione. A partire da C++ 11, questo è tutto ciò che abbiamo. I pool di thread non sono stati inclusi nello standard. (Non sono sicuro di ciò che C++ 14 ha da dire su di loro.)

Fortunatamente, è possibile implementare facilmente la logica richiesta. Ecco l'immagine di grandi dimensioni:

  • Inizio n thread di lavoro che tutti fare quanto segue:
    • Ripetere mentre non v'è più lavoro da fare:
      • Afferra il prossimo compito t (probabilmente aspettando che uno sia pronto).
      • Processo t.
  • Conservare l'inserimento di nuovi compiti nella coda di elaborazione.
  • Racconta ai thread di lavoro che non c'è nient'altro da fare.
  • Attendere il completamento dei thread di lavoro.

La parte più difficile qui (che è ancora abbastanza semplice) sta progettando correttamente la coda di lavoro. Di solito, una lista collegata sincronizzata (dalla STL) farà per questo. Sincronizzato significa che qualsiasi thread che desideri manipolare la coda deve farlo solo dopo aver acquisito uno std::mutex in modo da evitare condizioni di gara. Se un thread di lavoro trova l'elenco vuoto, deve attendere fino a quando non si torna a lavorare. È possibile utilizzare uno std::condition_variable per questo. Ogni volta che una nuova attività viene inserita nella coda, il thread di inserimento notifica a un thread che attende la variabile di condizione e quindi interromperà il blocco ed eventualmente inizierà l'elaborazione della nuova attività.

La seconda parte non banale è come segnalare ai thread di lavoro che non c'è più lavoro da fare. Chiaramente, è possibile impostare alcune bandiere globali, ma se un lavoratore è bloccato in attesa in coda, non si realizzerà presto. Una soluzione potrebbe essere quella dei thread notify_all() e farli controllare il flag ogni volta che vengono notificati. Un'altra opzione è inserire un elemento distinto "tossico" nella coda. Se un lavoratore incontra questo oggetto, si chiude da solo.

Rappresentare una coda di attività è semplice utilizzando gli oggetti task o semplicemente lambda.

Tutte le caratteristiche di cui sopra sono C++ 11. Se sei bloccato con una versione precedente, dovrai ricorrere a librerie di terze parti che forniscono multi-threading per la tua piattaforma specifica.

Mentre nessuno di questi è scienza missilistica, è ancora facile sbagliarsi la prima volta. E sfortunatamente, i bug relativi alla concorrenza sono tra i più difficili da eseguire. Cominciare spendendo alcune ore a leggere le sezioni pertinenti di un buon libro o lavorare attraverso un tutorial può rapidamente ripagare.

+0

Grazie per questa risposta, molto approfondita e ben scritta. Ho intenzione di salvare questo come piano B se la risposta di luk32 qui sotto non funziona per me. Il tuo è probabilmente il modo "giusto" per fare le cose. – notaCSmajor

15

Il modo più semplice è utilizzare una coda di attesa degli oggetti std::function. Come questo:

#include <iostream> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <queue> 
#include <functional> 
#include <chrono> 


class ThreadPool 
{ 
    public: 

    ThreadPool (int threads) : shutdown_ (false) 
    { 
     // Create the specified number of threads 
     threads_.reserve (threads); 
     for (int i = 0; i < threads; ++i) 
      threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i)); 
    } 

    ~ThreadPool() 
    { 
     { 
      // Unblock any threads and tell them to stop 
      std::unique_lock <std::mutex> l (lock_); 

      shutdown_ = true; 
      condVar_.notify_all(); 
     } 

     // Wait for all threads to stop 
     std::cerr << "Joining threads" << std::endl; 
     for (auto& thread : threads_) 
      thread.join(); 
    } 

    void doJob (std::function <void (void)> func) 
    { 
     // Place a job on the queu and unblock a thread 
     std::unique_lock <std::mutex> l (lock_); 

     jobs_.emplace (std::move (func)); 
     condVar_.notify_one(); 
    } 

    protected: 

    void threadEntry (int i) 
    { 
     std::function <void (void)> job; 

     while (1) 
     { 
      { 
       std::unique_lock <std::mutex> l (lock_); 

       while (! shutdown_ && jobs_.empty()) 
        condVar_.wait (l); 

       if (jobs_.empty()) 
       { 
        // No jobs to do and we are shutting down 
        std::cerr << "Thread " << i << " terminates" << std::endl; 
        return; 
       } 

       std::cerr << "Thread " << i << " does a job" << std::endl; 
       job = std::move (jobs_.front()); 
       jobs_.pop(); 
      } 

      // Do the job without holding any locks 
      job(); 
     } 

    } 

    std::mutex lock_; 
    std::condition_variable condVar_; 
    bool shutdown_; 
    std::queue <std::function <void (void)>> jobs_; 
    std::vector <std::thread> threads_; 
}; 

void silly (int n) 
{ 
    // A silly job for demonstration purposes 
    std::cerr << "Sleeping for " << n << " seconds" << std::endl; 
    std::this_thread::sleep_for (std::chrono::seconds (n)); 
} 

int main() 
{ 
    // Create two threads 
    ThreadPool p (2); 

    // Assign them 4 jobs 
    p.doJob (std::bind (silly, 1)); 
    p.doJob (std::bind (silly, 2)); 
    p.doJob (std::bind (silly, 3)); 
    p.doJob (std::bind (silly, 4)); 
} 
+0

Bella soluzione KISS senza guarnizioni extra. – Surt

0

Credo che hai bisogno di una risposta molto più semplice per l'esecuzione di una serie di discussioni più di una volta, questa è la soluzione migliore:

do{ 

    std::vector<std::thread> thread_vector; 

    for (int i=0;i<nworkers;i++) 
    { 
     thread_vector.push_back(std::thread(yourFunction,Parameter1,Parameter2, ...)); 
    } 

    for(std::thread& it: thread_vector) 
    { 
     it.join(); 
    } 
    q++; 
} while(q<NTIMES); 
0

È inoltre potrebbe rendere il vostro classe Thread e chiama il suo metodo di esecuzione come:

class MyThread 
{ 
public: 
void run(std::function<void()> func) { 
    thread_ = std::thread(func); 
} 
void join() { 
    if(thread_.joinable()) 
     thread_.join(); 
} 
private: 
    std::thread thread_; 
}; 

// Application code... 
MyThread myThread; 
myThread.run(AcquireData);