2009-05-29 16 views
8

Vorrei creare una classe i cui metodi possono essere chiamati da più thread. ma invece di eseguire il metodo nel thread da cui è stato chiamato, dovrebbe eseguirli tutti nella propria thread. Nessun risultato deve essere restituito e non dovrebbe bloccare il thread chiamante.Coda eventi/attività Multithreading C++

Un primo tentativo di implementazione che ho incluso di seguito. I metodi pubblici inseriscono un puntatore e dati di funzione in una coda di lavoro, che viene quindi ripresa dal thread di lavoro. Tuttavia non è particolarmente bello codice e l'aggiunta di nuovi metodi è ingombrante.

Idealmente mi piacerebbe usare questo come una classe base che posso facilmente aggiungere metodi (con un numero variabile di argomenti) con minimo hastle e duplicazione del codice.

Qual è un modo migliore per farlo? C'è qualche codice esistente disponibile che fa qualcosa di simile? Grazie

#include <queue> 

using namespace std; 

class GThreadObject 
{ 
    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     void * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 

private: 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionOneProxy(void * buffer); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 



#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* It should block on a condition, but Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    //Execute the function pointer with the attached data 
    (*this.*receivedEvent->funcPtr)(receivedEvent->data); 
} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 

    //Malloc an object the size of the function arguments 
    int argumentSize = sizeof(char*)+sizeof(int); 
    void * myData = malloc(argumentSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, &argOne, argumentSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = myData; 
    myEvent->funcPtr = &GThreadObject::functionOneProxy; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

/* 
* This is the function I would like to remove if possible 
* Split the void * buffer into arguments for the internal Function 
*/ 
void GThreadObject::functionOneProxy(void * buffer) 
{ 
    char * cBuff = (char*)buffer; 
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*))); 
}; 

int main() 
{ 
    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 

    return 0; 
} 

risposta

6

C'è la libreria Futures che si sta facendo strada in Boost e la libreria standard C++. C'è anche qualcosa dello stesso genere in ACE, ma mi dispiacerebbe raccomandarlo a chiunque (come già sottolineato da @lothar, è Active Object.)

+0

Stavo cercando il boost :: futures, ma poiché non fa parte di una versione di boost rilasciata, ho dovuto ricorrere al mio ACE fidato :-) – lothar

+0

La libreria dei future farà parte di Boost 1.41. È disponibile anche come parte della mia implementazione della libreria di thread C++ 0x su http://www.stdthread.co.uk –

+0

Grazie, Anthony. Piacere di sentirti :) –

2

La libreria POCO ha qualcosa sulla stessa linea chiamata ActiveMethod (insieme a alcune funzionalità correlate, ad esempio ActiveResult) nella sezione threading. Il codice sorgente è facilmente disponibile e facilmente comprensibile.

1

Per estensibilità e manutenibilità (e altre opzioni) è possibile definire una classe astratta (o un'interfaccia) per il "lavoro" che il thread deve eseguire. Quindi gli utenti del pool di thread implementeranno questa interfaccia e assegneranno il riferimento all'oggetto al pool di thread. Questo è molto simile al design Symbian Active Object: ogni AO sottoclama CActive e deve implementare metodi come Run() e Cancel().

Per semplicità l'interfaccia (classe astratta) potrebbe essere semplice come:

class IJob 
{ 
    virtual Run()=0; 
}; 

Poi il pool di thread, o richieste singolo thread accettare sarebbe avere qualcosa di simile:

class CThread 
{ 
    <...> 
public: 
    void AddJob(IJob* iTask); 
    <...> 
}; 

Naturalmente si sarebbe avere più compiti che possono avere tutti i tipi di setter/getter/attributi aggiuntivi e qualsiasi altra cosa tu abbia bisogno in ogni ambito della vita. Tuttavia, l'unico obbligo è quello di implementare il metodo run(), che svolgerà le lunghe calcoli:

class CDumbLoop : public IJob 
{ 
public: 
    CDumbJob(int iCount) : m_Count(iCount) {}; 
    ~CDumbJob() {}; 
    void Run() 
    { 
     // Do anything you want here 
    } 
private: 
    int m_Count; 
}; 
+0

Questo è il metodo esatto che utilizziamo per lo sviluppo di giochi su sistemi next-gen nella mia azienda (per un bonus di prestazioni aggiunto, cerca di aggiungere elementi alla coda di lavoro tramite metodi lock-free) –

+0

Qualsiasi consiglio sull'implementazione della coda senza blocco? –

+0

Ci sono alcuni buoni esempi di code prive di lock sulla rete (è una delle strutture di lock free più facili da implementare). Questo è quello su cui mi sono tagliato i denti quando mi sono imbattuto nel carrozzone senza serratura: http://www.boyet.com/Articles/LockfreeQueue.html –

2

È possibile risolvere questo problema utilizzando -library filo di Boost. Qualcosa di simile a questo (mezzo-pseudo):


class GThreadObject 
{ 
     ... 

     public: 
       GThreadObject() 
       : _done(false) 
       , _newJob(false) 
       , _thread(boost::bind(&GThreadObject::workerThread, this)) 
       { 
       } 

       ~GThreadObject() 
       { 
         _done = true; 

         _thread.join(); 
       } 

       void functionOne(char *argOne, int argTwo) 
       { 
         ... 

         _jobQueue.push(myEvent); 

         { 
           boost::lock_guard l(_mutex); 

           _newJob = true; 
         } 

         _cond.notify_one(); 
       } 

     private: 
       void workerThread() 
       { 
         while (!_done) { 
           boost::unique_lock l(_mutex); 

           while (!_newJob) { 
             cond.wait(l); 
           } 

           Event *receivedEvent = _jobQueue.front(); 

           ... 
         } 
       } 

     private: 
       volatile bool    _done; 
       volatile bool    _newJob; 
       boost::thread    _thread; 
       boost::mutex    _mutex; 
       boost::condition_variable _cond; 
       std::queue<Event*>  _jobQueue; 
}; 

Inoltre, si prega di notare come RAII ci consentono di ottenere questo codice più piccolo e meglio gestire.

+1

È std :: queue :: push threadsafe? Sembra che la funzione one_ lock_guard debba andare prima della chiamata _jobQueue.push. –

1

Ecco una classe che ho scritto per uno scopo simile (lo uso per la gestione degli eventi, ma ovviamente si potrebbe rinominarlo in ActionQueue e rinominarne i metodi).

si utilizza in questo modo:

Con la funzione che si desidera chiamare: void foo (const int x, const int y) { /*...*/ }

E: EventQueue q;

q.AddEvent (boost :: bind (foo, 10, 20));

Nel thread lavoratore

q.PlayOutEvents();

Nota: Dovrebbe essere abbastanza semplice aggiungere il codice al blocco a condizione per evitare l'utilizzo di cicli della CPU.

Il codice (Visual Studio 2003 con boost 1.34.1):

#pragma once 

#include <boost/thread/recursive_mutex.hpp> 
#include <boost/function.hpp> 
#include <boost/signals.hpp> 
#include <boost/bind.hpp> 
#include <boost/foreach.hpp> 
#include <string> 
using std::string; 


// Records & plays out actions (closures) in a safe-thread manner. 

class EventQueue 
{ 
    typedef boost::function <void()> Event; 

public: 

    const bool PlayOutEvents() 
    { 
     // The copy is there to ensure there are no deadlocks. 
     const std::vector<Event> eventsCopy = PopEvents(); 

     BOOST_FOREACH (const Event& e, eventsCopy) 
     { 
      e(); 
      Sleep (0); 
     } 

     return eventsCopy.size() > 0; 
    } 

    void AddEvent (const Event& event) 
    { 
     Mutex::scoped_lock lock (myMutex); 

     myEvents.push_back (event); 
    } 

protected: 

    const std::vector<Event> PopEvents() 
    { 
     Mutex::scoped_lock lock (myMutex); 

     const std::vector<Event> eventsCopy = myEvents; 
     myEvents.clear(); 

     return eventsCopy; 
    } 

private: 

    typedef boost::recursive_mutex Mutex; 
    Mutex myMutex; 

    std::vector <Event> myEvents; 

}; 

Spero che questo aiuta. :)

Martin Bilski

+0

Consiglio vivamente l'utilizzo di mutex (o qualsiasi altra forma di "primitiva di sincronizzazione") poiché non si adattano affatto a più processori (dopo circa 4-8 diminuiscono effettivamente la perfomance). Guarda nella codifica Lock-free per implementazioni veramente scalabili. Inoltre, se è necessario utilizzare una primitiva di sincronizzazione, utilizzare una sezione critica in quanto sono più veloci di un mutex (mutex è sicuro per il processo, la sezione critica è thread-safe, ovvero utilizza un mutex durante la sincronizzazione tra processi, CS quando si sincronizzano i thread nello stesso processi) –

0

Si dovrebbe dare un'occhiata alla libreria Boost ASIO. È progettato per inviare eventi in modo asincrono. Può essere abbinato alla libreria Boost Thread per creare il sistema che hai descritto.

È necessario creare un'istanza di un singolo oggetto boost::asio::io_service e pianificare una serie di eventi asincroni (boost::asio::io_service::post o boost::asio::io_service::dispatch). Successivamente, si chiama la funzione membro run da n thread. L'oggetto io_service è thread-safe e garantisce che i gestori asincroni vengano inviati solo in una thread da cui è stato chiamato io_service::run.

L'oggetto boost::asio::strand è utile anche per la sincronizzazione di thread semplice.

Per quello che vale, penso che la libreria ASIO sia una soluzione molto elegante a questo problema.

1

Di seguito è un'implementazione che non richiede un metodo "functionProxy". Anche se è più facile aggiungere nuovi metodi, è ancora complicato.

Boost :: Bind e "Futures" sembrano fare molta attenzione a tutto questo. Immagino di dare un'occhiata al codice boost e vedere come funziona. Grazie per i tuoi suggerimenti a tutti.

GThreadObject.h

#include <queue> 

using namespace std; 

class GThreadObject 
{ 

    template <int size> 
    class VariableSizeContainter 
    { 
     char data[size]; 
    }; 

    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     int dataSize; 
     char * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 
    void functionTwo(int argTwo, int arg2); 


private: 
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize); 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionTwoInternal(int argTwo, int arg2); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 

GThreadObject.cpp

#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument. 
    * This is the bit i would like to remove 
    * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize 
    * Subsequent data sizes would need to be added with an else if 
    * */ 
    if (receivedEvent->dataSize == 8) 
    { 
     const int size = 8; 

     void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>); 
     newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr; 

     //Execute the function 
     (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data)); 
    } 

    //Clean up 
    free(receivedEvent->data); 
    delete receivedEvent; 

} 

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize) 
{ 

    //Malloc an object the size of the function arguments 
    void * myData = malloc(argSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, (char*)argStart, argSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = (char*)myData; 
    myEvent->dataSize = argSize; 
    myEvent->funcPtr = funcPtr; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 

} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

void GThreadObject::functionTwo(int argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionTwoInternal(int argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl; 
} 

main.cpp

#include <iostream> 
#include "GThreadObject.h" 

int main() 
{ 

    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 
    myObj.functionTwo(456, 23); 


    return 0; 
} 

Edit: Solo per completezza ho fatto un'implementazione con boost :: bind.Le differenze principali:

queue<boost::function<void()> > jobQueue; 

void GThreadObjectBoost::functionOne(char * argOne, int argTwo) 
{ 
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo)); 

    workerThread(); 
} 

void GThreadObjectBoost::workerThread() 
{ 
    boost::function<void()> func = jobQueue.front(); 
    func(); 
} 

usando l'implementazione spinta per 10.000.000 iterazioni di functionOne() ci sono voluti ~ 19sec. Tuttavia l'implementazione non boost ha richiesto solo ~ 6.5 sec. Quindi circa 3 volte più lento. Sto indovinando che trovare una buona coda non bloccata sarà il collo più grande della bottiglia qui. Ma è ancora una bella differenza.