2012-07-26 10 views
9

Sto usando boost::asio::io_service come un pool di thread di base. Alcuni thread vengono aggiunti a io_service, il thread principale inizia a postare i gestori, i thread worker iniziano ad eseguire i gestori e tutto finisce. Fin qui tutto bene; Ottengo una bella accelerazione rispetto al codice a thread singolo.Impostazione del limite sulle dimensioni della coda di post con Boost Asio?

Tuttavia, il thread principale ha milioni di cose da pubblicare. E continua a postarli, molto più velocemente di quanto i thread worker possano gestirli. Non colgo i limiti di RAM, ma è ancora piuttosto stupido essere in agguato così tante cose. Quello che mi piacerebbe fare è avere una dimensione fissa per la coda del gestore e avere un blocco post() se la coda è piena.

Non vedo alcuna opzione per questo nei documenti ASIO Boost. È possibile?

risposta

0

potresti usare l'oggetto filamento per mettere gli eventi e mettere un ritardo nel tuo principale? Il tuo programma si ritira dopo che tutto il lavoro è stato pubblicato? Se è così, puoi usare l'oggetto di lavoro che ti darà più controllo su quando il tuo io_service si ferma.

si può sempre verificare le condizioni dei thread e attendere che uno sia libero o qualcosa del genere.

// collega

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link 
boost::asio::io_service io_service; 
boost::asio::io_service::work work(io_service); 

speranza che questo aiuti.

+0

Il problema non è che il 'io_service' si arresta prima di completare il lavoro --- sappiamo dell'eliminazione dell'oggetto' work' per fare in modo che il 'io_service' si arresti con garbo. Il problema è che il 'io_service' consente di accumulare troppe attività. Vorremmo limitare il numero di attività non assegnate in un modo che non implichi il polling sulla parte del thread che crea le attività, da qui la nostra domanda sull'opportunità di bloccare 'poll()'. – uckelman

2

Sto usando il semaforo per correggere la dimensione della coda dei gestori. Il codice seguente illustra questa soluzione:

void Schedule(boost::function<void()> function) 
{ 
    semaphore.wait(); 
    io_service.post(boost::bind(&TaskWrapper, function)); 
} 

void TaskWrapper(boost::function<void()> &function) 
{ 
    function(); 
    semaphore.post(); 
} 
1

È possibile avvolgere la lambda in un altro lambda, che si occupa di contare le attività "in-progress", e quindi attendere prima di pubblicare, se ci sono troppe attività in corso .

Esempio:

#include <atomic> 
#include <chrono> 
#include <future> 
#include <iostream> 
#include <mutex> 
#include <thread> 
#include <vector> 
#include <boost/asio.hpp> 

class ThreadPool { 
    using asio_worker = std::unique_ptr<boost::asio::io_service::work>; 
    boost::asio::io_service service; 
    asio_worker service_worker; 
    std::vector<std::thread> grp; 
    std::atomic<int> inProgress = 0; 
    std::mutex mtx; 
    std::condition_variable busy; 
public: 
    ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) { 
    for (int i = 0; i < threads; ++i) { 
     grp.emplace_back([this] { service.run(); }); 
    } 
    } 

    template<typename F> 
    void enqueue(F && f) { 
    std::unique_lock<std::mutex> lock(mtx); 
    // limit queue depth = number of threads 
    while (inProgress >= grp.size()) { 
     busy.wait(lock); 
    } 
    inProgress++; 
    service.post([this, f = std::forward<F>(f)]{ 
     try { 
     f(); 
     } 
     catch (...) { 
     inProgress--; 
     busy.notify_one(); 
     throw; 
     } 
     inProgress--; 
     busy.notify_one(); 
    }); 
    } 

    ~ThreadPool() { 
    service_worker.reset(); 
    for (auto& t : grp) 
     if (t.joinable()) 
     t.join(); 
    service.stop(); 
    } 
}; 

int main() { 
    std::unique_ptr<ThreadPool> pool(new ThreadPool(4)); 
    for (int i = 1; i <= 20; ++i) { 
    pool->enqueue([i] { 
     std::string s("Hello from task "); 
     s += std::to_string(i) + "\n"; 
     std::cout << s; 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 
    }); 
    } 
    std::cout << "All tasks queued.\n"; 
    pool.reset(); // wait for all tasks to complete 
    std::cout << "Done.\n"; 
} 

uscita:

Hello from task 3 
Hello from task 4 
Hello from task 2 
Hello from task 1 
Hello from task 5 
Hello from task 7 
Hello from task 6 
Hello from task 8 
Hello from task 9 
Hello from task 10 
Hello from task 11 
Hello from task 12 
Hello from task 13 
Hello from task 14 
Hello from task 15 
Hello from task 16 
Hello from task 17 
Hello from task 18 
All tasks queued. 
Hello from task 19 
Hello from task 20 
Done. 
0

Forse provare a ridurre la priorità del filo principale in modo che una volta che i thread di ottenere affollato hanno fame thread principale ei limiti del sistema autonomo.