7. Portable Thread safe Queue

Both queue and dequeue operations are critical sections protected by a mutex.

When the queue is empty, the dequeue operation waits on a semaphore signaled by the enqueue operation.

Start cpp section to pthread/pthread_sleep_queue.hpp[1 /1 ]
     1: #line 1241 "./lpsrc/flx_pthread.pak"
     2: #ifndef __FLX_PTHREAD_SLEEP_QUEUE_H__
     3: #define __FLX_PTHREAD_SLEEP_QUEUE_H__
     4: #include <flx_pthread_config.hpp>
     5: #include "pthread_mutex.hpp"
     6: #include "pthread_condv.hpp"
     7: 
     8: // interface for a consumer/producer queue. threads requesting a resource
     9: // that isn't there block until one is available. push/pop re-entrant
    10: 
    11: namespace flx { namespace pthread {
    12: 
    13: // ********************************************************
    14: /// Thread safe bounded queue.
    15: ///
    16: /// The queue can be locked by setting bound=0.
    17: /// In this state it can only be unlocked by setting a non-zero bound.
    18: ///
    19: /// If the bound is set to 1 (the default),
    20: /// then the queue is always either empty or full.
    21: /// An empty queue blocks readers until a writer sends some data.
    22: /// A full queue blocks writers, until a reader reads the data.
    23: /// Note that when the queue is empty a writer can write data
    24: /// and continues without waiting for the data to be read.
    25: // ********************************************************
    26: 
    27: class PTHREAD_EXTERN sleep_queue_t {
    28:   flx_condv_t size_changed;
    29:   void *lame_opaque;
    30:   size_t bound;
    31: public:
    32:   flx_mutex_t member_lock;
    33:   sleep_queue_t(size_t);
    34:   ~sleep_queue_t();
    35:   void enqueue(void*);
    36:   void* dequeue();
    37:   void resize(size_t);
    38:   void wait_until_empty();
    39: };
    40: 
    41: }} // namespace pthread, flx
    42: #endif
    43: 
End cpp section to pthread/pthread_sleep_queue.hpp[1]
Start cpp section to pthread/pthread_sleep_queue.cpp[1 /1 ]
     1: #line 1285 "./lpsrc/flx_pthread.pak"
     2: #include "pthread_sleep_queue.hpp"
     3: #include <queue>        // stl to the bloated rescue
     4: #include <string.h>       // strerror
     5: 
     6: using namespace std;
     7: 
     8: namespace flx { namespace pthread {
     9: typedef queue<void*> void_queue;
    10: 
    11: #define ELTQ ((void_queue*)lame_opaque)
    12: 
    13: sleep_queue_t::sleep_queue_t(size_t n) : bound(n)
    14: {
    15:   lame_opaque = new void_queue;
    16: }
    17: 
    18: // Much care is needed deleting a queue.
    19: // A safe method is possible .. but not provided here
    20: sleep_queue_t::~sleep_queue_t()
    21: {
    22:   delete ELTQ;
    23: }
    24: 
    25: void sleep_queue_t::wait_until_empty() {
    26:   flx_mutex_locker_t   l(member_lock);
    27:   while(!ELTQ->empty())
    28:     size_changed.wait(&member_lock);
    29: }
    30: 
    31: void
    32: sleep_queue_t::enqueue(void* elt)
    33: {
    34:   flx_mutex_locker_t   l(member_lock);
    35:   while(ELTQ->size() >= bound) // guard against spurious wakeups!
    36:     size_changed.wait(&member_lock);
    37:   ELTQ->push(elt);
    38:   size_changed.broadcast(); // cannot return an error
    39: }
    40: 
    41: void*
    42: sleep_queue_t::dequeue()
    43: {
    44:   flx_mutex_locker_t   l(member_lock);
    45:   while(ELTQ->empty())  // guard against spurious wakeups!
    46:     size_changed.wait(&member_lock);
    47:   void *elt = ELTQ->front();
    48:   ELTQ->pop();
    49:   size_changed.broadcast();
    50:   return elt;
    51: }
    52: 
    53: void
    54: sleep_queue_t::resize(size_t n)
    55: {
    56:   flx_mutex_locker_t   l(member_lock);
    57:   bound = n;
    58:   // get things rolling again
    59:   size_changed.broadcast();
    60: }
    61: 
    62: }}
    63: 
    64: 
End cpp section to pthread/pthread_sleep_queue.cpp[1]