2012-06-27 13 views
6

Stackoverflow è stato di grande aiuto per me e dovevo restituire qualcosa alla comunità. Ho implementato un threadpool semplice utilizzando la libreria di thread portatile C++ TinyThread ++ website, utilizzando ciò che ho imparato da Stackoverflow. Sono nuovo di infilare la programmazione, in modo non molto comodo con mutex, ecc Ho una domanda migliore chiesto dopo aver presentato il codice (che funziona piuttosto bene sotto Linux):Query sulla semplice implementazione del threadpool C++

// ThreadPool.h 

class ThreadPool 
{ 
public: 

ThreadPool(); 
~ThreadPool(); 

// Creates a pool of threads and gets them ready to be used 
void CreateThreads(int numOfThreads); 

// Assigns a job to a thread in the pool, but doesn't start the job 
// Each SubmitJob call will use up one thread of the pool. 
// This operation can only be undone by calling StartJobs and 
// then waiting for the jobs to complete. On completion, 
// new jobs may be submitted. 
void SubmitJob(void (*workFunc)(void *), void *workData); 

// Begins execution of all the jobs in the pool. 
void StartJobs(); 

// Waits until all jobs have completed. 
// The wait will block the caller. 
// On completion, new jobs may be submitted. 
void WaitForJobsToComplete(); 

private: 

enum typeOfWorkEnum { e_work, e_quit }; 

class ThreadData 
{ 
    public: 

    bool ready; // thread has been created and is ready for work 
    bool haveWorkToDo; 
    typeOfWorkEnum typeOfWork; 

    // Pointer to the work function each thread has to call. 
    void (*workFunc)(void *); 

    // Pointer to work data 
    void *workData; 

    ThreadData() : ready(false), haveWorkToDo(false) { }; 
}; 

struct ThreadArgStruct 
{ 
    ThreadPool *threadPoolInstance; 
    int   threadId; 
}; 

// Data for each thread 
ThreadData *m_ThreadData; 

ThreadPool(ThreadPool const&); // copy ctor hidden 
ThreadPool& operator=(ThreadPool const&); // assign op. hidden 

// Static function that provides the function pointer that a thread can call 
// By including the ThreadPool instance in the void * parameter, 
// we can use it to access other data and methods in the ThreadPool instance. 
static void ThreadFuncWrapper(void *arg) 
{ 
    ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg); 
    threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId); 
} 

// The function each thread calls  
void ThreadFunc(int threadId); 

// Called by the thread pool destructor 
void DestroyThreadPool(); 

// Total number of threads available 
// (fixed on creation of thread pool) 
int m_numOfThreads; 
int m_NumOfThreadsDoingWork; 
int m_NumOfThreadsGivenJobs; 

// List of threads 
std::vector<tthread::thread *> m_ThreadList; 

// Condition variable to signal each thread has been created and executing 
tthread::mutex    m_ThreadReady_mutex; 
tthread::condition_variable m_ThreadReady_condvar; 

// Condition variable to signal each thread to start work 
tthread::mutex    m_WorkToDo_mutex; 
tthread::condition_variable m_WorkToDo_condvar; 

// Condition variable to signal the main thread that 
// all threads in the pool have completed their work 
tthread::mutex    m_WorkCompleted_mutex; 
tthread::condition_variable m_WorkCompleted_condvar; 
}; 

di file cpp:

// 
// ThreadPool.cpp 
// 

#include "ThreadPool.h"  

// This is the thread function for each thread. 
// All threads remain in this function until 
// they are asked to quit, which only happens 
// when terminating the thread pool. 
void ThreadPool::ThreadFunc(int threadId) 
{ 
ThreadData *myThreadData = &m_ThreadData[threadId]; 
std::cout << "Hello world: Thread " << threadId << std::endl; 

// Signal that this thread is ready 
m_ThreadReady_mutex.lock(); 
     myThreadData->ready = true; 
     m_ThreadReady_condvar.notify_one(); // notify the main thread 
m_ThreadReady_mutex.unlock();  

while(true) 
{ 
    //tthread::lock_guard<tthread::mutex> guard(m); 
    m_WorkToDo_mutex.lock(); 

    while(!myThreadData->haveWorkToDo) // check for work to do 
     m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here 
    myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex 

    m_WorkToDo_mutex.unlock(); 

    // Do the work 
    switch(myThreadData->typeOfWork) 
    { 
     case e_work: 
      std::cout << "Thread " << threadId << ": Woken with work to do\n"; 

      // Do work 
      myThreadData->workFunc(myThreadData->workData); 

      std::cout << "#Thread " << threadId << ": Work is completed\n"; 
      break; 

     case e_quit: 
      std::cout << "Thread " << threadId << ": Asked to quit\n"; 
      return; // ends the thread 
    } 

    // Now to signal the main thread that my work is completed 
    m_WorkCompleted_mutex.lock(); 
     m_NumOfThreadsDoingWork--; 

     // Unsure if this 'if' would make the program more efficient 
     // if(m_NumOfThreadsDoingWork == 0) 
      m_WorkCompleted_condvar.notify_one(); // notify the main thread 
    m_WorkCompleted_mutex.unlock();  
    } 

} 


ThreadPool::ThreadPool() 
{ 
    m_numOfThreads = 0; m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0; 
} 


ThreadPool::~ThreadPool() 
{ 
    if(m_numOfThreads) 
    { 
    DestroyThreadPool(); 
    delete [] m_ThreadData; 
    } 
} 


void ThreadPool::CreateThreads(int numOfThreads) 
{ 
// Check if a thread pool has already been created 
if(m_numOfThreads > 0) 
    return; 

m_NumOfThreadsGivenJobs = 0; 
m_NumOfThreadsDoingWork = 0; 
m_numOfThreads = numOfThreads; 
m_ThreadData = new ThreadData[m_numOfThreads]; 
ThreadArgStruct threadArg; 

for(int i=0; i<m_numOfThreads; ++i) 
{ 
    threadArg.threadId = i; 
    threadArg.threadPoolInstance = this; 

    // Creates the thread and saves it in a list so we can destroy it later 
    m_ThreadList.push_back(new tthread::thread(ThreadFuncWrapper, (void *)&threadArg )); 

    // It takes a little time for a thread to get established. 
    // Best wait until it gets established before creating the next thread. 
    m_ThreadReady_mutex.lock(); 
    while(!m_ThreadData[i].ready) // Check if thread is ready 
     m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here 
    m_ThreadReady_mutex.unlock();  
} 
} 


// Assigns a job to a thread, but doesn't start the job 
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData) 
{ 
// Check if the thread pool has been created 
if(!m_numOfThreads) 
    return; 

if(m_NumOfThreadsGivenJobs >= m_numOfThreads) 
    return; 

m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc; 
m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData; 

std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl; 

m_NumOfThreadsGivenJobs++; 
} 

void ThreadPool::StartJobs() 
{ 
// Check that the thread pool has been created 
// and some jobs have been assigned 
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs) 
    return; 

// Set 'haveworkToDo' flag for all threads 
m_WorkToDo_mutex.lock(); 
    for(int i=0; i<m_NumOfThreadsGivenJobs; ++i) 
    { 
     m_ThreadData[i].typeOfWork = e_work; // forgot to do this ! 
     m_ThreadData[i].haveWorkToDo = true; 
    } 
    m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs; 

    // Reset this counter so we can resubmit jobs later 
    m_NumOfThreadsGivenJobs = 0; 

    // Notify all threads they have work to do 
    m_WorkToDo_condvar.notify_all(); 
    m_WorkToDo_mutex.unlock(); 
} 


void ThreadPool::WaitForJobsToComplete() 
{ 
    // Check that a thread pool has been created 
    if(!m_numOfThreads) 
    return; 

m_WorkCompleted_mutex.lock(); 
while(m_NumOfThreadsDoingWork > 0) // Check if all threads have completed their work 
    m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here 
m_WorkCompleted_mutex.unlock();  
} 


void ThreadPool::DestroyThreadPool() 
{ 
std::cout << "Ask threads to quit\n"; 
m_WorkToDo_mutex.lock(); 
    for(int i=0; i<m_numOfThreads; ++i) 
    { 
    m_ThreadData[i].haveWorkToDo = true; 
    m_ThreadData[i].typeOfWork = e_quit; 
    } 
    m_WorkToDo_condvar.notify_all(); 
m_WorkToDo_mutex.unlock(); 

// As each thread terminates, catch them here 
for(int i=0; i<m_numOfThreads; ++i) 
{ 
    tthread::thread *t = m_ThreadList[i]; 

    // Wait for thread to complete 
    t->join(); 
} 
m_numOfThreads = 0; 
} 

Esempio di utilizzo: (questo calcola pi-quadrato/6 sommando i reciproci dei quadrati) In realtà, questo esempio di utilizzo esegue lo stesso calcolo 10 volte in parallelo. Un uso più pratico sarebbe per ogni thread per calcolare un diverso insieme di termini sommati. Il risultato finale viene quindi ottenuto aggiungendo tutti i risultati del thread una volta completato il lavoro del pool.

struct CalculationDataStruct 
{ 
int inputVal; 
double outputVal; 
}; 

void LongCalculation(void *theSums) 
{ 
CalculationDataStruct *sums = (CalculationDataStruct *)theSums; 

int terms = sums->inputVal; 
double sum; 
for(int i=1; i<terms; i++) 
    sum += 1.0/(double(i)*double(i)); 
sums->outputVal = sum; 
} 


int main(int argc, char** argv) 
{ 
int numThreads = 10; 

// Create pool 
ThreadPool threadPool; 
threadPool.CreateThreads(numThreads); 

// Create thread workspace 
CalculationDataStruct sums[numThreads]; 

// Set up jobs 
for(int i=0; i<numThreads; i++) 
{ 
    sums[i].inputVal = 3000*(i+1); 
    threadPool.SubmitJob(LongCalculation, &sums[i]); 
} 

// Run the jobs 
threadPool.StartJobs(); 
threadPool.WaitForJobsToComplete(); 

// Print results 
for(int i=0; i<numThreads; i++) 
    std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl; 

return 0; 
} 

Domanda: Nel metodo ThreadPool :: ThreadFunc, sarebbe la migliore prestazione essere ottenuto se la seguente istruzione if

if(NumOfThreadsDoingWork == 0) 

è stato incluso? Inoltre, sarei grato delle critiche e dei modi per migliorare il codice. Allo stesso tempo, spero che il codice sia utile agli altri.

+0

A meno che non ci sia qualcosa per il thread principale da fare dopo il completamento di un singolo lavoro, non c'è alcun punto nella segnalazione fino a quando tutti i lavori sono stati completati. C'è il punto di risvegliare il thread principale se tutto ciò che sta per fare è tornare immediatamente a dormire. Detto questo, dubito che il sovraccarico sia sufficiente per poter misurare la differenza (a meno che il pool di thread non contenga un numero enorme di thread) –

+0

Grazie JF, anche quello era il mio pensiero. Ho provato il codice con e senza la frase 'if' e non sono stato in grado di rilevare alcuna differenza di prestazioni, ma poi di nuovo, stavo usando solo 10 thread, che è il massimo che userò comunque nelle mie app. – ticketman

+0

Risolto un bug che impediva il corretto funzionamento del debug di Windows: aggiunto: m_ThreadData [i] .typeOfWork = e_work; per for-loop in funzione StartJobs(). – ticketman

risposta

1

In primo luogo, si consiglia di esaminare "std::thread" di C++ 11 e "std :: mutex". Si consiglia inoltre di esaminare Intel "Threading Building Blocks" che fornisce una serie di modelli per la distribuzione del lavoro. Per un'API incapsulata, cross-platform, C++, ho generalmente utilizzato lo OpenThreads library. Infine, puoi creare carichi di lavoro distribuibili e scalabili senza mutex utilizzando una libreria che trasmette messaggi, come ZeroMQ.

Guardando il codice corrente, la mia più grande preoccupazione sarebbe che non sembra che stiate bloccando le variabili utilizzate per assegnare il lavoro ai thread; Presumo che sia perché hai separato SubmitJob e StartWork.

Ma in ultima analisi, ThreadPool non è thread-safe.

È anche un po 'una API complessa con i tipi di lavoro, ecc. Probabilmente è necessario astrarre il concetto di "lavoro". Ecco un esempio in cui l'ho fatto, probabilmente vorrai incapsulare la maggior parte del codice nella tua classe ThreadPool; anche il metodo di terminazione (lavoro NULL) è un po 'artificiale, probabilmente si vorrebbe usare pthread_cancel, ma questo ha servito abbastanza bene questa dimostrazione.

#include <queue> 
#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 

static int jobNo = 0; 
class Job { 
public: 
    Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); } 
    int m_i; 
    void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); } 
}; 

std::queue<Job*> queue; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 

void AddJob(Job* job) { 
    pthread_mutex_lock(&mutex); 
    queue.push(job); 
    pthread_cond_signal(&cond); 
    pthread_mutex_unlock(&mutex); 
} 

void* QueueWorker(void* /*threadInfo*/) { 
    Job* job = NULL; 
    for (;;) { 
     pthread_mutex_lock(&mutex); 
     while (queue.empty()) { 
      // unlock the mutex until the cond is signal()d or broadcast() to. 
      // if this call succeeds, we will have the mutex locked again on the other side. 
      pthread_cond_wait(&cond, &mutex); 
     } 
     // take the first task and then release the lock. 
     job = queue.front(); 
     queue.pop(); 
     pthread_mutex_unlock(&mutex); 

     if (job == NULL) { 
      // in this demonstration, NULL ends the run, so forward to any other threads. 
      AddJob(NULL); 
      break; 
     } 
     job->Execute(); 
     delete job; 
    } 
    return NULL; 
} 

int main(int argc, const char* argv[]) { 
    pthread_t worker1, worker2; 
    pthread_create(&worker1, NULL, &QueueWorker, NULL); 
    pthread_create(&worker2, NULL, &QueueWorker, NULL); 

    srand(time(NULL)); 

    // queue 5 jobs with delays. 
    for (size_t i = 0; i < 5; ++i) { 
     long delay = (rand() % 800) * 1000; 
     printf("Producer sleeping %fs\n", (float)delay/(1000*1000)); 
     usleep(delay); 
     Job* job = new Job(); 
     AddJob(job); 
    } 
    // 5 more without delays. 
    for (size_t i = 0; i < 5; ++i) { 
     AddJob(new Job); 
    } 
    // null to end the run. 
    AddJob(NULL); 

    printf("Done with jobs.\n"); 
    pthread_join(worker1, NULL); 
    pthread_join(worker2, NULL); 

    return 0; 
}