2011-01-02 14 views
17

Sto cercando un'implementazione raccomandata di una coda di blocco thread-safe (multiproduttore/consumatore) in C utilizzando la semantica della sincronizzazione pthread.coda di blocco sincronizzata pthread

risposta

25

Ecco quello che uso

threadqueue.h

#ifndef _THREADQUEUE_H_ 
#define _THREADQUEUE_H_ 1 

#include <pthread.h> 

#ifdef __cplusplus 
extern "C" { 
#endif 
/** 
* @defgroup ThreadQueue ThreadQueue 
* 
* Little API for waitable queues, typically used for passing messages 
* between threads. 
* 
*/ 

/** 
* @mainpage 
    */ 

/** 
* A thread message. 
* 
* @ingroup ThreadQueue 
* 
* This is used for passing to #thread_queue_get for retreive messages. 
* the date is stored in the data member, the message type in the #msgtype. 
* 
* Typical: 
* @code 
* struct threadmsg; 
* struct myfoo *foo; 
* while(1) 
*  ret = thread_queue_get(&queue,NULL,&message); 
*  .. 
*  foo = msg.data; 
*  switch(msg.msgtype){ 
*    ... 
*  } 
* } 
* @endcode 
* 
*/ 
struct threadmsg{ 
     /** 
     * Holds the data. 
     */ 
     void *data; 
     /** 
     * Holds the messagetype 
     */ 
     long msgtype; 
     /** 
     * Holds the current queue lenght. Might not be meaningful if there's several readers 
     */ 
     long qlength; 

}; 


/** 
* A TthreadQueue 
* 
* @ingroup ThreadQueue 
* 
* You should threat this struct as opaque, never ever set/get any 
* of the variables. You have been warned. 
*/ 
struct threadqueue { 
/** 
* Length of the queue, never set this, never read this. 
* Use #threadqueue_length to read it. 
*/ 
     long length; 
/** 
* Mutex for the queue, never touch. 
*/ 
     pthread_mutex_t mutex; 
/** 
* Condition variable for the queue, never touch. 
*/ 
     pthread_cond_t cond; 
/** 
* Internal pointers for the queue, never touch. 
*/ 
     struct msglist *first,*last; 
/** 
* Internal cache of msglists 
*/ 
    struct msglist *msgpool; 
/** 
* No. of elements in the msgpool 
*/ 
    long msgpool_length; 
}; 

/** 
* Initializes a queue. 
* 
* @ingroup ThreadQueue 
* 
* thread_queue_init initializes a new threadqueue. A new queue must always 
* be initialized before it is used. 
* 
* @param queue Pointer to the queue that should be initialized 
* @return 0 on success see pthread_mutex_init 
*/ 
int thread_queue_init(struct threadqueue *queue); 

/** 
* Adds a message to a queue 
* 
* @ingroup ThreadQueue 
* 
* thread_queue_add adds a "message" to the specified queue, a message 
* is just a pointer to a anything of the users choice. Nothing is copied 
* so the user must keep track on (de)allocation of the data. 
* A message type is also specified, it is not used for anything else than 
* given back when a message is retreived from the queue. 
* 
* @param queue Pointer to the queue on where the message should be added. 
* @param data the "message". 
* @param msgtype a long specifying the message type, choice of the user. 
* @return 0 on succes ENOMEM if out of memory EINVAL if queue is NULL 
*/ 
int thread_queue_add(struct threadqueue *queue, void *data, long msgtype); 

/** 
* Gets a message from a queue 
* 
* @ingroup ThreadQueue 
* 
* thread_queue_get gets a message from the specified queue, it will block 
* the caling thread untill a message arrives, or the (optional) timeout occurs. 
* If timeout is NULL, there will be no timeout, and thread_queue_get will wait 
* untill a message arrives. 
* 
* struct timespec is defined as: 
* @code 
*  struct timespec { 
*     long tv_sec;   // seconds 
*     long tv_nsec;  // nanoseconds 
*    }; 
* @endcode 
* 
* @param queue Pointer to the queue to wait on for a message. 
* @param timeout timeout on how long to wait on a message 
* @param msg pointer that is filled in with mesagetype and data 
* 
* @return 0 on success EINVAL if queue is NULL ETIMEDOUT if timeout occurs 
*/ 
int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg); 


/** 
* Gets the length of a queue 
* 
* @ingroup ThreadQueue 
* 
* threadqueue_length returns the number of messages waiting in the queue 
* 
* @param queue Pointer to the queue for which to get the length 
* @return the length(number of pending messages) in the queue 
*/ 
long thread_queue_length(struct threadqueue *queue); 

/** 
* @ingroup ThreadQueue 
* Cleans up the queue. 
* 
* threadqueue_cleanup cleans up and destroys the queue. 
* This will remove all messages from a queue, and reset it. If 
* freedata is != 0 free(3) will be called on all pending messages in the queue 
* You cannot call this if there are someone currently adding or getting messages 
* from the queue. 
* After a queue have been cleaned, it cannot be used again untill #thread_queue_init 
* has been called on the queue. 
* 
* @param queue Pointer to the queue that should be cleaned 
* @param freedata set to nonzero if free(3) should be called on remaining 
* messages 
* @return 0 on success EINVAL if queue is NULL EBUSY if someone is holding any locks on the queue 
*/ 
int thread_queue_cleanup(struct threadqueue *queue, int freedata); 

#ifdef __cplusplus 
} 
#endif 

#endif 

threadqueue.c

#include <stdlib.h> 
#include <string.h> 
#include <errno.h> 
#include <pthread.h> 
#include <sys/time.h> 
#include "../h/threadqueue.h" 


#define MSGPOOL_SIZE 256 

struct msglist { 
    struct threadmsg msg; 
    struct msglist *next; 
}; 

static inline struct msglist *get_msglist(struct threadqueue *queue) 
{ 
    struct msglist *tmp; 

    if(queue->msgpool != NULL) { 
     tmp = queue->msgpool; 
     queue->msgpool = tmp->next; 
     queue->msgpool_length--; 
    } else { 
     tmp = malloc(sizeof *tmp); 
    } 

    return tmp; 
} 

static inline void release_msglist(struct threadqueue *queue,struct msglist *node) 
{ 

    if(queue->msgpool_length > (queue->length/8 + MSGPOOL_SIZE)) { 
     free(node); 
    } else { 
     node->msg.data = NULL; 
     node->msg.msgtype = 0; 
     node->next = queue->msgpool; 
     queue->msgpool = node; 
     queue->msgpool_length++; 
    } 
    if(queue->msgpool_length > (queue->length/4 + MSGPOOL_SIZE*10)) { 
     struct msglist *tmp = queue->msgpool; 
     queue->msgpool = tmp->next; 
     free(tmp); 
     queue->msgpool_length--; 
    } 
} 

int thread_queue_init(struct threadqueue *queue) 
{ 
    int ret = 0; 
    if (queue == NULL) { 
     return EINVAL; 
    } 
    memset(queue, 0, sizeof(struct threadqueue)); 
    ret = pthread_cond_init(&queue->cond, NULL); 
    if (ret != 0) { 
     return ret; 
    } 

    ret = pthread_mutex_init(&queue->mutex, NULL); 
    if (ret != 0) { 
     pthread_cond_destroy(&queue->cond); 
     return ret; 
    } 

    return 0; 

} 

int thread_queue_add(struct threadqueue *queue, void *data, long msgtype) 
{ 
    struct msglist *newmsg; 
    pthread_mutex_lock(&queue->mutex); 
    newmsg = get_msglist(queue); 
    if (newmsg == NULL) { 
     pthread_mutex_unlock(&queue->mutex); 
     return ENOMEM; 
    } 
    newmsg->msg.data = data; 
    newmsg->msg.msgtype = msgtype; 

    newmsg->next = NULL; 
    if (queue->last == NULL) { 
     queue->last = newmsg; 
     queue->first = newmsg; 
    } else { 
     queue->last->next = newmsg; 
     queue->last = newmsg; 
    } 

     if(queue->length == 0) 
       pthread_cond_broadcast(&queue->cond); 
    queue->length++; 
    pthread_mutex_unlock(&queue->mutex); 

    return 0; 

} 

int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg) 
{ 
    struct msglist *firstrec; 
    int ret = 0; 
    struct timespec abstimeout; 

    if (queue == NULL || msg == NULL) { 
     return EINVAL; 
    } 
    if (timeout) { 
     struct timeval now; 

     gettimeofday(&now, NULL); 
     abstimeout.tv_sec = now.tv_sec + timeout->tv_sec; 
     abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec; 
     if (abstimeout.tv_nsec >= 1000000000) { 
      abstimeout.tv_sec++; 
      abstimeout.tv_nsec -= 1000000000; 
     } 
    } 

    pthread_mutex_lock(&queue->mutex); 

    /* Will wait until awakened by a signal or broadcast */ 
    while (queue->first == NULL && ret != ETIMEDOUT) { //Need to loop to handle spurious wakeups 
     if (timeout) { 
      ret = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout); 
     } else { 
      pthread_cond_wait(&queue->cond, &queue->mutex); 

     } 
    } 
    if (ret == ETIMEDOUT) { 
     pthread_mutex_unlock(&queue->mutex); 
     return ret; 
    } 

    firstrec = queue->first; 
    queue->first = queue->first->next; 
    queue->length--; 

    if (queue->first == NULL) { 
     queue->last = NULL;  // we know this since we hold the lock 
     queue->length = 0; 
    } 


    msg->data = firstrec->msg.data; 
    msg->msgtype = firstrec->msg.msgtype; 
     msg->qlength = queue->length; 

    release_msglist(queue,firstrec); 
    pthread_mutex_unlock(&queue->mutex); 

    return 0; 
} 

//maybe caller should supply a callback for cleaning the elements ? 
int thread_queue_cleanup(struct threadqueue *queue, int freedata) 
{ 
    struct msglist *rec; 
    struct msglist *next; 
    struct msglist *recs[2]; 
    int ret,i; 
    if (queue == NULL) { 
     return EINVAL; 
    } 

    pthread_mutex_lock(&queue->mutex); 
    recs[0] = queue->first; 
    recs[1] = queue->msgpool; 
    for(i = 0; i < 2 ; i++) { 
     rec = recs[i]; 
     while (rec) { 
      next = rec->next; 
      if (freedata) { 
       free(rec->msg.data); 
      } 
      free(rec); 
      rec = next; 
     } 
    } 

    pthread_mutex_unlock(&queue->mutex); 
    ret = pthread_mutex_destroy(&queue->mutex); 
    pthread_cond_destroy(&queue->cond); 

    return ret; 

} 

long thread_queue_length(struct threadqueue *queue) 
{ 
    long counter; 
    // get the length properly 
    pthread_mutex_lock(&queue->mutex); 
    counter = queue->length; 
    pthread_mutex_unlock(&queue->mutex); 
    return counter; 

} 
+0

sono lì molte situazioni in cui 'thread_queue_length()' può essere usato in modo non filante? – caf

+0

No. Dipende dalle tue esigenze, thread_queue_length ti offre la lunghezza in coda in questo momento, indipendentemente dal numero di nuovi messaggi aggiunti o elaborati nell'ultimo nanosecondo. Lo uso semplicemente per la registrazione periodica della dimensione della coda. – nos

+0

'add' e' get' usano entrambi lo stesso mutex. se 'get' acquisisce il mutex e sta aspettando il' broadcast' da 'add' ... non creerà un deadlock poiché add non potrebbe bloccare il mutex? – Mike