00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028
00029 #define DO_DEBUG 0
00030
00031 template <class T> class circular_buffer
00032 {
00033 private:
00034
00035 T* d_buffer;
00036
00037
00038 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00039 UInt32 d_n_avail_write_I, d_n_avail_read_I;
00040
00041
00042 mld_mutex_ptr d_internal;
00043 mld_condition_ptr d_readBlock, d_writeBlock;
00044
00045
00046 bool d_doWriteBlock, d_doFullRead, d_doAbort;
00047
00048 void delete_mutex_cond () {
00049 if (d_internal) {
00050 delete d_internal;
00051 d_internal = NULL;
00052 }
00053 if (d_readBlock) {
00054 delete d_readBlock;
00055 d_readBlock = NULL;
00056 }
00057 if (d_writeBlock) {
00058 delete d_writeBlock;
00059 d_writeBlock = NULL;
00060 }
00061 };
00062
00063 public:
00064 circular_buffer (UInt32 bufLen_I,
00065 bool doWriteBlock = true, bool doFullRead = false) {
00066 if (bufLen_I == 0)
00067 throw std::runtime_error ("circular_buffer(): "
00068 "Number of items to buffer must be > 0.\n");
00069 d_bufLen_I = bufLen_I;
00070 d_buffer = (T*) new T[d_bufLen_I];
00071 d_doWriteBlock = doWriteBlock;
00072 d_doFullRead = doFullRead;
00073 d_internal = NULL;
00074 d_readBlock = d_writeBlock = NULL;
00075 reset ();
00076 #if DO_DEBUG
00077 fprintf (stderr, "c_b(): buf len (items) = %ld, "
00078 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00079 (d_doWriteBlock ? "true" : "false"),
00080 (d_doFullRead ? "true" : "false"));
00081 #endif
00082 };
00083
00084 ~circular_buffer () {
00085 delete_mutex_cond ();
00086 delete [] d_buffer;
00087 };
00088
00089 inline UInt32 n_avail_write_items () {
00090 d_internal->lock ();
00091 UInt32 retVal = d_n_avail_write_I;
00092 d_internal->unlock ();
00093 return (retVal);
00094 };
00095
00096 inline UInt32 n_avail_read_items () {
00097 d_internal->lock ();
00098 UInt32 retVal = d_n_avail_read_I;
00099 d_internal->unlock ();
00100 return (retVal);
00101 };
00102
00103 inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00104 inline bool do_write_block () {return (d_doWriteBlock);};
00105 inline bool do_full_read () {return (d_doFullRead);};
00106
00107 void reset () {
00108 d_doAbort = false;
00109 bzero (d_buffer, d_bufLen_I * sizeof (T));
00110 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00111 d_n_avail_write_I = d_bufLen_I;
00112 delete_mutex_cond ();
00113 d_internal = new mld_mutex ();
00114 d_readBlock = new mld_condition ();
00115 d_writeBlock = new mld_condition ();
00116 };
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139 int enqueue (T* buf, UInt32 bufLen_I) {
00140 #if DO_DEBUG
00141 fprintf (stderr, "enqueue: buf = %X, bufLen = %ld.\n",
00142 (unsigned int)buf, bufLen_I);
00143 #endif
00144 if (bufLen_I > d_bufLen_I) {
00145 fprintf (stderr, "cannot add buffer longer (%ld"
00146 ") than instantiated length (%ld"
00147 ").\n", bufLen_I, d_bufLen_I);
00148 throw std::runtime_error ("circular_buffer::enqueue()");
00149 }
00150
00151 if (bufLen_I == 0)
00152 return (0);
00153 if (!buf)
00154 throw std::runtime_error ("circular_buffer::enqueue(): "
00155 "input buffer is NULL.\n");
00156 d_internal->lock ();
00157 if (d_doAbort) {
00158 d_internal->unlock ();
00159 return (2);
00160 }
00161 if (bufLen_I > d_n_avail_write_I) {
00162 if (d_doWriteBlock) {
00163 while (bufLen_I > d_n_avail_write_I) {
00164 #if DO_DEBUG
00165 fprintf (stderr, "enqueue: #len > #a, waiting.\n");
00166 #endif
00167 d_internal->unlock ();
00168 d_writeBlock->wait ();
00169 d_internal->lock ();
00170 if (d_doAbort) {
00171 d_internal->unlock ();
00172 #if DO_DEBUG
00173 fprintf (stderr, "enqueue: #len > #a, aborting.\n");
00174 #endif
00175 return (2);
00176 }
00177 #if DO_DEBUG
00178 fprintf (stderr, "enqueue: #len > #a, done waiting.\n");
00179 #endif
00180 }
00181 } else {
00182 d_n_avail_read_I = d_bufLen_I - bufLen_I;
00183 d_n_avail_write_I = bufLen_I;
00184 #if DO_DEBUG
00185 fprintf (stderr, "circular_buffer::enqueue: overflow\n");
00186 #endif
00187 return (-1);
00188 }
00189 }
00190 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00191 if (n_now_I > bufLen_I)
00192 n_now_I = bufLen_I;
00193 else if (n_now_I < bufLen_I)
00194 n_start_I = bufLen_I - n_now_I;
00195 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00196 if (n_start_I) {
00197 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00198 d_writeNdx_I = n_start_I;
00199 } else
00200 d_writeNdx_I += n_now_I;
00201 d_n_avail_read_I += bufLen_I;
00202 d_n_avail_write_I -= bufLen_I;
00203 d_readBlock->signal ();
00204 d_internal->unlock ();
00205 return (1);
00206 };
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230 int dequeue (T* buf, UInt32* bufLen_I) {
00231 #if DO_DEBUG
00232 fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld.\n",
00233 (unsigned int)buf, *bufLen_I);
00234 #endif
00235 if (!bufLen_I)
00236 throw std::runtime_error ("circular_buffer::dequeue(): "
00237 "input bufLen pointer is NULL.\n");
00238 if (!buf)
00239 throw std::runtime_error ("circular_buffer::dequeue(): "
00240 "input buffer pointer is NULL.\n");
00241 UInt32 l_bufLen_I = *bufLen_I;
00242 if (l_bufLen_I == 0)
00243 return (0);
00244 if (l_bufLen_I > d_bufLen_I) {
00245 fprintf (stderr, "cannot remove buffer longer (%ld"
00246 ") than instantiated length (%ld"
00247 ").\n", l_bufLen_I, d_bufLen_I);
00248 throw std::runtime_error ("circular_buffer::dequeue()");
00249 }
00250
00251 d_internal->lock ();
00252 if (d_doAbort) {
00253 d_internal->unlock ();
00254 return (2);
00255 }
00256 if (d_doFullRead) {
00257 while (d_n_avail_read_I < l_bufLen_I) {
00258 #if DO_DEBUG
00259 fprintf (stderr, "dequeue: #a < #len, waiting.\n");
00260 #endif
00261 d_internal->unlock ();
00262 d_readBlock->wait ();
00263 d_internal->lock ();
00264 if (d_doAbort) {
00265 d_internal->unlock ();
00266 #if DO_DEBUG
00267 fprintf (stderr, "dequeue: #a < #len, aborting.\n");
00268 #endif
00269 return (2);
00270 }
00271 #if DO_DEBUG
00272 fprintf (stderr, "dequeue: #a < #len, done waiting.\n");
00273 #endif
00274 }
00275 } else {
00276 while (d_n_avail_read_I == 0) {
00277 #if DO_DEBUG
00278 fprintf (stderr, "dequeue: #a == 0, waiting.\n");
00279 #endif
00280 d_internal->unlock ();
00281 d_readBlock->wait ();
00282 d_internal->lock ();
00283 if (d_doAbort) {
00284 d_internal->unlock ();
00285 #if DO_DEBUG
00286 fprintf (stderr, "dequeue: #a == 0, aborting.\n");
00287 #endif
00288 return (2);
00289 }
00290 #if DO_DEBUG
00291 fprintf (stderr, "dequeue: #a == 0, done waiting.\n");
00292 #endif
00293 }
00294 }
00295 if (l_bufLen_I > d_n_avail_read_I)
00296 l_bufLen_I = d_n_avail_read_I;
00297 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00298 if (n_now_I > l_bufLen_I)
00299 n_now_I = l_bufLen_I;
00300 else if (n_now_I < l_bufLen_I)
00301 n_start_I = l_bufLen_I - n_now_I;
00302 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00303 if (n_start_I) {
00304 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00305 d_readNdx_I = n_start_I;
00306 } else
00307 d_readNdx_I += n_now_I;
00308 *bufLen_I = l_bufLen_I;
00309 d_n_avail_read_I -= l_bufLen_I;
00310 d_n_avail_write_I += l_bufLen_I;
00311 d_writeBlock->signal ();
00312 d_internal->unlock ();
00313 return (1);
00314 };
00315
00316 void abort () {
00317 d_internal->lock ();
00318 d_doAbort = true;
00319 d_writeBlock->signal ();
00320 d_readBlock->signal ();
00321 d_internal->unlock ();
00322 };
00323 };
00324
00325 #endif