2012-06-20 10 views
5

Implementerò boost :: asio server con un pool di thread utilizzando il singolo io_service (HTTP Server 3 example). io_service sarà associato a unix dominio socket e passerà le richieste che vanno da connessioni su questo socket a thread diversi. Per ridurre il consumo di risorse, voglio rendere dinamico il pool di thread.Esempio di pool di thread dinamici in boost :: asio

Ecco un concetto. Innanzitutto viene creato un singolo thread. Quando arriva una richiesta e il server vede che non c'è un thread inattivo in un pool, crea un nuovo thread e passa la richiesta ad esso. Il server può creare fino a un numero massimo di thread. Idealmente dovrebbe avere la funzionalità di sospendere i thread che sono inattivi per qualche tempo.

Qualcuno ha fatto qualcosa di simile? O forse qualcuno ha un esempio pertinente?

Per quanto mi riguarda, suppongo che dovrei in qualche modo sovrascrivere lo io_service.dispatch per riuscirci.

risposta

5

Ci possono essere alcune sfide con l'approccio iniziale:

  • boost::asio::io_service non è destinato ad essere derivati ​​da o reimplementato. Notare la mancanza di funzioni virtuali.
  • Se la libreria di thread non fornisce la possibilità di interrogare lo stato di un thread, le informazioni di stato devono essere gestite separatamente.

Una soluzione alternativa è quella di inviare un lavoro nella io_service, quindi controllare quanto tempo seduto nel io_service. Se il delta temporale tra quando era pronto per l'esecuzione e quando è stato effettivamente eseguito è superiore a una determinata soglia, significa che ci sono più lavori in coda rispetto ai thread che gestiscono la coda. Un importante vantaggio è che la logica di crescita del pool di thread dinamici viene sganciata da un'altra logica.

Ecco un esempio che consente di ottenere ciò utilizzando deadline_timer.

  • Impostare deadline_timer per scadere 3 secondi da ora.
  • Asincronicamente attendere il deadline_timer. Il gestore sarà pronto per l'esecuzione 3 secondi da quando è stato impostato lo deadline_timer.
  • Nel gestore asincrono, controllare l'ora corrente relativa a quando il timer è stato impostato per scadere. Se è maggiore di 2 secondi, viene eseguito il backup della coda io_service, quindi aggiungere un thread al pool di thread.

Esempio:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class thread_pool_checker 
    : private boost::noncopyable 
{ 
public: 

    thread_pool_checker(boost::asio::io_service& io_service, 
         boost::thread_group& threads, 
         unsigned int max_threads, 
         long threshold_seconds, 
         long periodic_seconds) 
    : io_service_(io_service), 
     timer_(io_service), 
     threads_(threads), 
     max_threads_(max_threads), 
     threshold_seconds_(threshold_seconds), 
     periodic_seconds_(periodic_seconds) 
    { 
     schedule_check(); 
    } 

private: 

    void schedule_check(); 
    void on_check(const boost::system::error_code& error); 

private: 

    boost::asio::io_service& io_service_; 
    boost::asio::deadline_timer timer_; 
    boost::thread_group&  threads_; 
    unsigned int    max_threads_; 
    long      threshold_seconds_; 
    long      periodic_seconds_; 
}; 

void thread_pool_checker::schedule_check() 
{ 
    // Thread pool is already at max size. 
    if (max_threads_ <= threads_.size()) 
    { 
    std::cout << "Thread pool has reached its max. Example will shutdown." 
       << std::endl; 
    io_service_.stop(); 
    return; 
    } 

    // Schedule check to see if pool needs to increase. 
    std::cout << "Will check if pool needs to increase in " 
      << periodic_seconds_ << " seconds." << std::endl; 
    timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_)); 
    timer_.async_wait( 
    boost::bind(&thread_pool_checker::on_check, this, 
       boost::asio::placeholders::error)); 
} 

void thread_pool_checker::on_check(const boost::system::error_code& error) 
{ 
    // On error, return early. 
    if (error) return; 

    // Check how long this job was waiting in the service queue. This 
    // returns the expiration time relative to now. Thus, if it expired 
    // 7 seconds ago, then the delta time is -7 seconds. 
    boost::posix_time::time_duration delta = timer_.expires_from_now(); 
    long wait_in_seconds = -delta.seconds(); 

    // If the time delta is greater than the threshold, then the job 
    // remained in the service queue for too long, so increase the 
    // thread pool. 
    std::cout << "Job job sat in queue for " 
      << wait_in_seconds << " seconds." << std::endl; 
    if (threshold_seconds_ < wait_in_seconds) 
    { 
    std::cout << "Increasing thread pool." << std::endl; 
    threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, 
        &io_service_)); 
    } 

    // Otherwise, schedule another pool check. 
    run(); 
} 

// Busy work functions. 
void busy_work(boost::asio::io_service&, 
       unsigned int); 

void add_busy_work(boost::asio::io_service& io_service, 
        unsigned int count) 
{ 
    io_service.post(
    boost::bind(busy_work, 
       boost::ref(io_service), 
       count)); 
} 

void busy_work(boost::asio::io_service& io_service, 
       unsigned int count) 
{ 
    boost::this_thread::sleep(boost::posix_time::seconds(5)); 

    count += 1; 

    // When the count is 3, spawn additional busy work. 
    if (3 == count) 
    { 
    add_busy_work(io_service, 0); 
    } 
    add_busy_work(io_service, count); 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 

    // Create io service. 
    boost::asio::io_service io_service; 

    // Add some busy work to the service. 
    add_busy_work(io_service, 0); 

    // Create thread group and thread_pool_checker. 
    boost::thread_group threads; 
    thread_pool_checker checker(io_service, threads, 
           3, // Max pool size. 
           2, // Create thread if job waits for 2 sec. 
           3); // Check if pool needs to grow every 3 sec. 

    // Start running the io service. 
    io_service.run(); 

    threads.join_all(); 

    return 0; 
} 

uscita:

Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 7 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 4 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 3 seconds. 
Increasing thread pool. 
Thread pool has reached its max. Example will shutdown.
+1

Se ho capito bene, i compiti busy_work può aspettare in coda per i secondi così come correttore piscina anche se numero massimo di thread non è stato raggiunto ancora perché i nuovi thread non vengono creati prima del tempo. Ciò rende difficilmente utilizzabile questo principio perché la caratteristica di essere dinamici non dovrebbe degradare così tanto le prestazioni. Dovrebbe rendere l'esecuzione dell'attività più lunga solo del tempo necessario per una nuova creazione del thread rispetto al tempo necessario con il pool statico. Grazie lo stesso. – boqapt

+0

@ user484936: la tua comprensione è corretta.La crescita del pool si verifica dopo che è stata rilevata una degradazione; è uno degli approcci più semplici al pooling e non dovrebbe "degradare le prestazioni". Se si desidera allocare i thread _quando si sa_ che sono necessari, è necessario gestire lo stato dei thread, introducendo un sovraccarico per tutti i thread e potrebbe richiedere la dispersione della logica di stato in tutto il codice. Se si desidera allocare i thread _ come si prevede che saranno necessari, quindi disporre di una thread dedicata che inserisce un lavoro nel servizio, quindi attendere per una risposta a tempo. –

+0

Mi chiedo cosa succede in uno scenario in cui è stata eseguita solo un'attività a lunga esecuzione e aggiungiamo inutilmente un thread al pool quando il timer scatta. Se in quel momento non c'è più nessun evento da elaborare, allora questo approccio mi sembra inefficiente. Perfavore, correggimi se sbaglio. – russoue