2012-01-18 4 views
25

Più leggo più confuso divento ... Avrei pensato che fosse banale trovare una coda mpsc formalmente corretta implementata in C++.Esiste una coda lock-free per singolo consumatore con più produttori per C++?

Ogni volta che trovo un altro tentativo, ulteriori ricerche sembrano suggerire che ci siano problemi come ABA o altre sottili condizioni di gara.

Molti parlano della necessità di garbage collection. Questo è qualcosa che voglio evitare.

Esiste una corretta implementazione open source corretta?

+0

Il mio consiglio è di cercare alternative. Per prestazioni, bloccare le code libere non sono grandiose. Non è possibile fare nulla velocemente ogni volta che si hanno diversi thread che scrivono sulla stessa riga della cache. L'uso della coda SPSC separata per ciascun produttore è molto più veloce. Lo svantaggio è che si perde l'ordine degli articoli. Se hai bisogno di una coda ordinata e vuoi risparmiare un sacco di mal di testa, usa solo uno spinlock. In pratica è quasi buono come lockfree e centinaia di volte più veloce di ad es. Sezioni critiche Win32. – Timo

+0

Hai pensato di utilizzare il ring buffer invece della coda? Esistono alcuni vantaggi in termini di prestazioni: posizione di memoria fissa con slot assegnati una volta, contatori molto semplici da puntare a coda/testa senza necessità di blocco, prevedibile accesso alla memoria, ecc. – bronekk

+0

@bronekk puoi elaborare per favore? Hai un esempio funzionante per caso? –

risposta

10

Si consiglia di controllare il disturbatore; è disponibile in C++ qui: http://lmax-exchange.github.io/disruptor/

trova anche spiegazione Funziona here on stackoverflow sostanza è buffer circolare senza bloccaggio, ottimizzata per passare messaggi FIFO tra i thread in slot di dimensioni fisse.

Qui ci sono due implementazioni che ho trovato utile: Lock-free Multi-producer Multi-consumer Queue on Ring Buffer @ NatSys Lab. Blog e
Yet another implementation of a lock-free circular array queue @ CodeProject

NOTA: il codice qui sotto non è corretto, lo lascio solo come un esempio di come ingannevole queste cose possono essere.

Se non ti piace la complessità della versione di google, qui è qualcosa di simile da me - è molto più semplice, ma lascio come esercizio per il lettore per farlo funzionare (è parte del progetto più ampio, non portatile al momento). L'idea generale è di mantenere il buffer cirtulare per i dati e una piccola serie di contatori per identificare gli slot per scrivere/scrivere e leggere/leggere. Poiché ogni contatore si trova nella propria linea di cache e (normalmente) ognuno viene aggiornato solo atomicamente una volta nella vita di un messaggio, possono essere letti senza alcuna sincronizzazione. Esiste un potenziale punto di conflitto tra la scrittura di thread in post_done, è necessario per la garanzia FIFO. I contatori (head_, wrtn_, rdng_, tail_) sono stati selezionati per garantire la correttezza e FIFO, quindi la caduta di FIFO richiederebbe anche il cambio di contatori (e questo potrebbe essere difficile da fare senza sacrificare la correttezza).È possibile migliorare leggermente le prestazioni per gli scenari con un solo consumatore, ma non mi preoccuperei: dovresti annullare l'operazione se vengono trovati altri casi d'uso con più lettori.

Sulla mia latenza macchina si presenta come segue (percentile a sinistra, significa che all'interno di questa percentile sulla destra, l'unità è microsecondo, misurata dal rdtsc):

total=1000000 samples, avg=0.24us 
    50%=0.214us, avg=0.093us 
    90%=0.23us, avg=0.151us 
    99%=0.322us, avg=0.159us 
    99.9%=15.566us, avg=0.173us 

Questi risultati di un singolo consumatore polling, ovvero thread di lavoro chiamando wheel.read() in un circuito chiuso e controllando se non è vuoto (scorrere verso il basso per esempio). I consumatori in attesa (utilizzo della CPU molto più basso) attendono l'evento (una delle funzioni acquire...), questo aggiunge circa 1-2us alla latenza media dovuta allo switch di contesto.

Poiché c'è una piccola contesa in lettura, i consumatori si adattano molto bene al numero di thread di lavoro, ad es. per 3 discussioni sulla mia macchina:

total=1500000 samples, avg=0.07us 
    50%=0us, avg=0us 
    90%=0.155us, avg=0.016us 
    99%=0.361us, avg=0.038us 
    99.9%=8.723us, avg=0.044us 

Le patch saranno i benvenuti :)

// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki 
// 
// Distributed under the Boost Software License, Version 1.0. (See accompanying 
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 

#pragma once 

#include <core/api.hxx> 
#include <core/wheel/exception.hxx> 

#include <boost/noncopyable.hpp> 
#include <boost/type_traits.hpp> 
#include <boost/lexical_cast.hpp> 
#include <typeinfo> 

namespace core { namespace wheel 
{ 
    struct bad_size : core::exception 
    { 
    template<typename T> explicit bad_size(const T&, size_t m) 
     : core::exception(std::string("Slot capacity exceeded, sizeof(") 
        + typeid(T).name() 
        + ") = " 
        + boost::lexical_cast<std::string>(sizeof(T)) 
        + ", capacity = " 
        + boost::lexical_cast<std::string>(m) 
       ) 
    {} 
    };   

    // inspired by Disruptor 
    template <typename Header> 
    class wheel : boost::noncopyable 
    { 
    __declspec(align(64)) 
    struct slot_detail 
    { 
     // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel) 
     // slot read: (memory barrier in wheel) > read_done > (memory barrier in wheel) 

     // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate 
     template <bool Writing> 
     void done(wheel* w) 
     { 
     if (Writing) 
      w->post_done(sequence); 
     else 
      w->read_done(); 
     } 

     // cache line for sequence number and header 
     long long sequence; 
     Header header; 

     // there is no such thing as data type with variable size, but we need it to avoid thrashing 
     // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element. 
     // This is well into UB territory! Using template parameter for this is not good, since it 
     // results in this small implementation detail leaking to all possible user interfaces. 
     __declspec(align(8)) 
     char data[8]; 
    }; 

    // use this as a storage space for slot_detail, to guarantee 64 byte alignment 
    _declspec(align(64)) 
    struct slot_block { long long padding[8]; }; 

    public: 
    // wrap slot data to outside world 
    template <bool Writable> 
    class slot 
    { 
     template<typename> friend class wheel; 

     slot& operator=(const slot&); // moveable but non-assignable 

     // may only be constructed by wheel 
     slot(slot_detail* impl, wheel<Header>* w, size_t c) 
     : slot_(impl) , wheel_(w) , capacity_(c) 
     {} 

    public: 
     slot(slot&& s) 
     : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_) 
     { 
     s.slot_ = NULL; 
     } 

     ~slot() 
     { 
     if (slot_) 
     { 
      slot_->done<Writable>(wheel_); 
     } 
     } 

     // slot accessors - use Header to store information on what type is actually stored in data 
     bool empty() const   { return !slot_; } 
     long long sequence() const { return slot_->sequence; } 
     Header& header()   { return slot_->header; } 
     char* data()    { return slot_->data; } 

     template <typename T> T& cast() 
     { 
     static_assert(boost::is_pod<T>::value, "Data type must be POD"); 
     if (sizeof(T) > capacity_) 
      throw bad_size(T(), capacity_); 
     if (empty()) 
      throw no_data(); 
     return *((T*) data()); 
     } 

    private: 
     slot_detail* slot_; 
     wheel<Header>* wheel_; 
     const size_t capacity_; 
    }; 

    private: 
    // dynamic size of slot, with extra capacity, expressed in 64 byte blocks 
    static size_t sizeof_slot(size_t s) 
    { 
     size_t m = sizeof(slot_detail); 
     // add capacity less 8 bytes already within sizeof(slot_detail) 
     m += max(8, s) - 8; 
     // round up to 64 bytes, i.e. alignment of slot_detail 
     size_t r = m & ~(unsigned int)63; 
     if (r < m) 
     r += 64; 
     r /= 64; 
     return r; 
    } 

    // calculate actual slot capacity back from number of 64 byte blocks 
    static size_t slot_capacity(size_t s) 
    { 
     return s*64 - sizeof(slot_detail) + 8; 
    } 

    // round up to power of 2 
    static size_t round_size(size_t s) 
    { 
     // enfore minimum size 
     if (s <= min_size) 
     return min_size; 

     // find rounded value 
     --s; 
     size_t r = 1; 
     while (s) 
     { 
     s >>= 1; 
     r <<= 1; 
     }; 
     return r; 
    } 

    slot_detail& at(long long sequence) 
    { 
     // find index from sequence number and return slot at found index of the wheel 
     return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]); 
    } 

    public: 
    wheel(size_t capacity, size_t size) 
     : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_() 
     , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size)) 
    { 
     static_assert(boost::is_pod<Header>::value, "Header type must be POD"); 
     static_assert(sizeof(slot_block) == 64, "This was unexpected"); 

     wheel_ = new slot_block[size_ * blocks_]; 
     // all slots must be initialised to 0 
     memset(wheel_, 0, size_ * 64 * blocks_); 
     active_ = 1; 
    } 

    ~wheel() 
    { 
     stop(); 
     delete[] wheel_; 
    } 

    // all accessors needed 
    size_t capacity() const { return capacity_; } // capacity of a single slot 
    size_t size() const  { return size_; }  // number of slots available 
    size_t queue() const { return (size_t)head_ - (size_t)tail_; } 
    bool active() const  { return active_ == 1; } 

    // enough to call it just once, to fine tune slot capacity 
    template <typename T> 
    void check() const 
    { 
     static_assert(boost::is_pod<T>::value, "Data type must be POD"); 
     if (sizeof(T) > capacity_) 
     throw bad_size(T(), capacity_); 
    } 

    // stop the wheel - safe to execute many times 
    size_t stop() 
    { 
     InterlockedExchange(&active_, 0); 
     // must wait for current read to complete 
     while (rdng_ != tail_) 
     Sleep(10); 

     return size_t(head_ - tail_); 
    } 

    // return first available slot for write 
    slot<true> post() 
    { 
     if (!active_) 
     throw stopped(); 

     // the only memory barrier on head seq. number we need, if not overflowing 
     long long h = InterlockedIncrement64(&head_); 
     while(h - (long long) size_ > tail_) 
     { 
     if (InterlockedDecrement64(&head_) == h - 1) 
      throw overflowing(); 

     // protection against case of race condition when we are overflowing 
     // and two or more threads try to post and two or more messages are read, 
     // all at the same time. If this happens we must re-try, otherwise we 
     // could have skipped a sequence number - causing infinite wait in post_done 
     Sleep(0); 
     h = InterlockedIncrement64(&head_); 
     } 

     slot_detail& r = at(h); 
     r.sequence = h; 

     // wrap in writeable slot 
     return slot<true>(&r, this, capacity_); 
    } 

    // return first available slot for write, nothrow variant 
    slot<true> post(std::nothrow_t) 
    { 
     if (!active_) 
     return slot<true>(NULL, this, capacity_); 

     // the only memory barrier on head seq. number we need, if not overflowing 
     long long h = InterlockedIncrement64(&head_); 
     while(h - (long long) size_ > tail_) 
     { 
     if (InterlockedDecrement64(&head_) == h - 1) 
      return slot<true>(NULL, this, capacity_); 

     // must retry if race condition described above 
     Sleep(0); 
     h = InterlockedIncrement64(&head_); 
     } 

     slot_detail& r = at(h); 
     r.sequence = h; 

     // wrap in writeable slot 
     return slot<true>(&r, this, capacity_); 
    } 

    // read first available slot for read 
    slot<false> read() 
    { 
     slot_detail* r = NULL; 
     // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier 
     if (active_ && rdng_ < wrtn_) 
     { 
     // the only memory barrier on reading seq. number we need 
     const long long h = InterlockedIncrement64(&rdng_); 
     // check if this slot has been written, step back if not 
     if (h > wrtn_) 
      InterlockedDecrement64(&rdng_); 
     else 
      r = &at(h); 
     } 

     // wrap in readable slot 
     return slot<false>(r , this, capacity_); 
    } 

    // waiting for new post, to be used by non-polling clients 
    void acquire() 
    { 
     event_.acquire(); 
    } 

    bool try_acquire() 
    { 
     return event_.try_acquire(); 
    } 

    bool try_acquire(unsigned long timeout) 
    { 
     return event_.try_acquire(timeout); 
    } 

    void release() 
    {} 

    private: 
    void post_done(long long sequence) 
    { 
     const long long t = sequence - 1; 

     // the only memory barrier on written seq. number we need 
     while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t) 
     Sleep(0); 

     // this is outside of critical path for polling clients 
     event_.set(); 
    } 

    void read_done() 
    { 
     // the only memory barrier on tail seq. number we need 
     InterlockedIncrement64(&tail_); 
    } 

    // each in its own cache line 
    // head_ - wrtn_ = no. of messages being written at this moment 
    // rdng_ - tail_ = no. of messages being read at the moment 
    // head_ - tail_ = no. of messages to read (including those being written and read) 
    // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read) 
    __declspec(align(64)) volatile long long head_; // currently writing or written 
    __declspec(align(64)) volatile long long wrtn_; // written 
    __declspec(align(64)) volatile long long rdng_; // currently reading or read 
    __declspec(align(64)) volatile long long tail_; // read 
    __declspec(align(64)) volatile long active_; // flag switched to 0 when stopped 

    __declspec(align(64)) 
    api::event event_;   // set when new message is posted 
    const size_t blocks_;  // number of 64-byte blocks in a single slot_detail 
    const size_t capacity_;  // capacity of data() section per single slot. Initialisation depends on blocks_ 
    const size_t size_;   // number of slots available, always power of 2 
    slot_block* wheel_; 
    }; 
}} 

Ecco cosa polling thread di lavoro consumatore può apparire come:

while (wheel.active()) 
    { 
    core::wheel::wheel<int>::slot<false> slot = wheel.read(); 
    if (!slot.empty()) 
    { 
     Data& d = slot.cast<Data>(); 
     // do work 
    } 
    // uncomment below for waiting consumer, saving CPU cycles 
    // else 
    // wheel.try_acquire(10); 
    } 

A cura consumatore aggiunto esempio

+0

per favore, puoi spiegare quali sono l'intestazione e i dati/qual è la differenza? Se voglio memorizzare 3 * parole a 64 bit in ogni slot (che è l'insieme del payload) come utilizzerei questo? –

0

Suppongo che non esista alcuna cosa del genere e, in tal caso, non è portatile o non è open source.

Concettualmente, si sta tentando di controllare due puntatori contemporaneamente: il puntatore tail e il puntatore tail->next. Questo non può essere fatto generalmente con i primitivi senza blocco.

+0

L'ipotesi è sbagliata. I produttori devono solo spostare una coda. Quello che stai descrivendo è una coda intrusiva. In tal caso, è possibile aggiornare il tail-> next e quindi spostare atomicamente la coda. Se non ci sei riuscito, riavvia il ciclo. – edwinc

3

L'implementazione più adatta dipende dalle proprietà desiderate di una coda. Dovrebbe essere illimitato o uno limitato va bene? Dovrebbe essere linearizable, o requisiti meno severi andrebbero bene? Quanto è forte la FIFO che ti garantisce? Sei disposto a pagare il costo del ripristino della lista da parte del consumatore (esiste un'implementazione molto semplice in cui il consumatore afferra la coda di una lista unificata, ottenendo così tutti gli articoli messi dai produttori fino al momento)? Dovrebbe garantire che nessun thread sia mai bloccato, o che le piccole possibilità di ottenere un thread bloccato siano ok? E ecc

Alcuni link utili:
Is multiple-producer, single-consumer possible in a lockfree setting?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3

Speranza che aiuta.

+1

Ho creato una versione basata sull'implementazione di Dmitry Vyukov: https://github.com/samanbarghi/MPSCQ/ –

0

Di seguito è la tecnica Ho usato per la mia libreria Cooperative Multi-tasking/Multi-threading (MACE) http://bytemaster.github.com/mace/. Ha il vantaggio di essere privo di blocco tranne quando la coda è vuota.

struct task { 
    boost::function<void()> func; 
    task* next; 
}; 


boost::mutex      task_ready_mutex; 
boost::condition_variable  task_ready; 
boost::atomic<task*>    task_in_queue; 

// this can be called from any thread 
void thread::post_task(task* t) { 
    // atomically post the task to the queue. 
    task* stale_head = task_in_queue.load(boost::memory_order_relaxed); 
    do { t->next = stale_head; 
    } while(!task_in_queue.compare_exchange_weak(stale_head, t, boost::memory_order_release)); 

    // Because only one thread can post the 'first task', only that thread will attempt 
    // to aquire the lock and therefore there should be no contention on this lock except 
    // when *this thread is about to block on a wait condition. 
    if(!stale_head) { 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex); 
     task_ready.notify_one(); 
    } 
} 

// this is the consumer thread. 
void process_tasks() { 
    while(!done) { 
    // this will atomically pop everything that has been posted so far. 
    pending = task_in_queue.exchange(0,boost::memory_order_consume); 
    // pending is a linked list in 'reverse post order', so process them 
    // from tail to head if you want to maintain order. 

    if(!pending) { // lock scope 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex);     
     // check one last time while holding the lock before blocking. 
     if(!task_in_queue) task_ready.wait(lock); 
    } 
} 
+0

Credo che dovrebbe essere in sospeso = task_in_queue.exchange (0, boost :: memory_order_acquire); poiché nello standard ISO C++ 11 è indicato 29.3.2 "Un'operazione atomica A che esegue un'operazione di rilascio su un oggetto atomico M si sincronizza con un'operazione atomica B che esegue un'operazione di acquisizione su M e ne acquisisce il valore da qualsiasi lato effetto nella sequenza intestata da A. " – ipapadop

+0

Non direi che il blocco è libero quando la "coda" non è vuota, a causa dell'allocazione/recupero della memoria –

Problemi correlati