2012-08-28 23 views
6

Ho implementato un pool di thread utilizzando boost::asio e alcuni oggetti boost::thread chiamando boost::asio::io_service::run(). Tuttavia, un requisito che mi è stato dato è di avere un modo per monitorare tutti i thread per "salute". Il mio intento è quello di creare un oggetto sentinella semplice che possa essere passato attraverso il pool di thread: se lo supera, possiamo supporre che il thread stia ancora elaborando il lavoro.boost :: asio, pool di thread e monitoraggio thread

Tuttavia, data la mia implementazione, non sono sicuro di come (se) posso controllare tutti i thread nel pool in modo affidabile. Ho semplicemente delegato la funzione thread a boost::asio::io_service::run(), quindi l'invio di un oggetto sentinella nell'istanza io_service non garantisce quale thread effettivamente riceverà quel sentinel e farà il lavoro.

Un'opzione può essere quella di inserire periodicamente la sentinella e sperare che venga catturata da ciascun thread almeno una volta in un ragionevole lasso di tempo, ma ovviamente non è l'ideale.

Prendete il seguente esempio. A causa del modo in cui il gestore è codificato, in questo caso possiamo vedere che ogni thread farà la stessa quantità di lavoro, ma in realtà non avrò il controllo dell'implementazione del gestore, alcuni possono essere di lunga durata mentre altri saranno quasi immediato.

#include <iostream> 
#include <boost/asio.hpp> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

void handler() 
{ 
    std::cout << boost::this_thread::get_id() << "\n"; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 
} 

int main(int argc, char **argv) 
{ 
    boost::asio::io_service svc(3); 

    std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); 

    boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); 

    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 

    work.reset(); 

    three.join(); 
    two.join(); 
    one.join(); 

    return 0; 
} 

risposta

2

La soluzione che ho usato si basa sul fatto che possiedo l'attuazione degli oggetti della piscina del battistrada. Ho creato un tipo di wrapper che aggiornerà le statistiche e copierà i gestori definiti dall'utente che vengono pubblicati nel pool di thread. Solo questo tipo di wrapper viene mai pubblicato sul sottostante io_service. Questo metodo mi permette di tenere traccia dei gestori che vengono pubblicati/eseguiti, senza dover essere invadenti nel codice utente.

Ecco un esempio ridotta e semplificata:

#include <iostream> 
#include <memory> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

// Supports scheduling anonymous jobs that are 
// executable as returning nothing and taking 
// no arguments 
typedef std::function<void(void)> functor_type; 

// some way to store per-thread statistics 
typedef std::map<boost::thread::id, int> thread_jobcount_map; 

// only this type is actually posted to 
// the asio proactor, this delegates to 
// the user functor in operator() 
struct handler_wrapper 
{ 
    handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) 
     : user_functor_(user_functor) 
     , statistics_(statistics) 
    { 
    } 

    void operator()() 
    { 
     user_functor_(); 

     // just for illustration purposes, assume a long running job 
     boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 

     // increment executed jobs 
     ++statistics_[boost::this_thread::get_id()]; 
    } 

    functor_type   user_functor_; 
    thread_jobcount_map& statistics_; 
}; 

// anonymous thread function, just runs the proactor 
void thread_func(boost::asio::io_service& proactor) 
{ 
    proactor.run(); 
} 

class ThreadPool 
{ 
public: 
    ThreadPool(size_t thread_count) 
    { 
     threads_.reserve(thread_count); 

     work_.reset(new boost::asio::io_service::work(proactor_)); 

     for(size_t curr = 0; curr < thread_count; ++curr) 
     { 
     boost::thread th(thread_func, boost::ref(proactor_)); 

     // inserting into this map before any work can be scheduled 
     // on it, means that we don't have to look it for lookups 
     // since we don't dynamically add threads 
     thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); 

     threads_.emplace_back(std::move(th)); 
     } 
    } 

    // the only way for a user to get work into 
    // the pool is to use this function, which ensures 
    // that the handler_wrapper type is used 
    void schedule(const functor_type& user_functor) 
    { 
     handler_wrapper to_execute(user_functor, thread_jobcount_); 
     proactor_.post(to_execute); 
    } 

    void join() 
    { 
     // join all threads in pool: 
     work_.reset(); 
     proactor_.stop(); 

     std::for_each(
     threads_.begin(), 
     threads_.end(), 
     [] (boost::thread& t) 
     { 
     t.join(); 
     }); 
    } 

    // just an example showing statistics 
    void log() 
    { 
     std::for_each(
     thread_jobcount_.begin(), 
     thread_jobcount_.end(), 
     [] (const thread_jobcount_map::value_type& it) 
     { 
     std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; 
     }); 
    } 

private: 
    std::vector<boost::thread> threads_; 
    std::unique_ptr<boost::asio::io_service::work> work_; 
    boost::asio::io_service proactor_; 
    thread_jobcount_map  thread_jobcount_; 
}; 

struct add 
{ 
    add(int lhs, int rhs, int* result) 
     : lhs_(lhs) 
     , rhs_(rhs) 
     , result_(result) 
    { 
    } 

    void operator()() 
    { 
     *result_ = lhs_ + rhs_; 
    } 

    int lhs_,rhs_; 
    int* result_; 
}; 

int main(int argc, char **argv) 
{ 
    // some "state objects" that are 
    // manipulated by the user functors 
    int x = 0, y = 0, z = 0; 

    // pool of three threads 
    ThreadPool pool(3); 

    // schedule some handlers to do some work 
    pool.schedule(add(5, 4, &x)); 
    pool.schedule(add(2, 2, &y)); 
    pool.schedule(add(7, 8, &z)); 

    // give all the handlers time to execute 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 

    std::cout 
     << "x = " << x << "\n" 
     << "y = " << y << "\n" 
     << "z = " << z << "\n"; 

    pool.join(); 

    pool.log(); 
} 

uscita:

x = 9 
y = 4 
z = 15 
Thread: 0000000000B25430 executed 1 jobs 
Thread: 0000000000B274F0 executed 1 jobs 
Thread: 0000000000B27990 executed 1 jobs 
+0

puoi aggiungere il codice alla tua risposta @Chad? –

+0

Fatto. Felice per qualsiasi feedback su di esso. – Chad

6

È possibile utilizzare un'istanza io_service comune tra tutti i thread e un'istanza io_service privato per ogni thread. Ogni filo eseguirà un metodo come questo:

void Mythread::threadLoop() 
{ 
    while(/* termination condition */) 
    { 
     commonIoService.run_one(); 
     privateIoService.run_one(); 

     commonConditionVariable.timed_wait(time); 
    } 
} 

In questo modo, se si vuole garantire che qualche compito viene eseguito in un thread, è sufficiente inviare questo compito nella sua io_service di proprietà.

di inviare un'attività nel pool di thread si può fare:

void MyThreadPool::post(Hander handler) 
{ 
    commonIoService.post(handler); 
    commonConditionVariable.notify_all(); 
} 
+0

Un approccio interessante, ma sto cercando qualcosa di un po 'più semplice. Se non emergerà nient'altro nei prossimi giorni, potrei accettare questa risposta. – Chad

+0

Penso che non ci sia una soluzione più semplice usando boot asio. Ho sviluppato una soluzione come questa con pochi codici e funziona. –

Problemi correlati