00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <boost/thread/mutex.hpp>
00032 #include <boost/thread/condition_variable.hpp>
00033 #include <boost/shared_ptr.hpp>
00034 #include <boost/foreach.hpp>
00035
00036 #include "observer.h"
00037
00038 #pragma once
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 namespace user_locks {
00054 namespace barriers {
00055
00056
00057 class Barrier {
00058 public:
00059 typedef boost::shared_ptr<Barrier> shared_ptr;
00060
00061 Barrier(drizzled::session_id_t owner_arg) :
00062 owner(owner_arg),
00063 limit(0),
00064 current_wait(0),
00065 generation(0)
00066 { }
00067
00068 Barrier(drizzled::session_id_t owner_arg, int64_t limit_arg) :
00069 owner(owner_arg),
00070 limit(limit_arg),
00071 current_wait(limit),
00072 generation(0)
00073 {
00074 }
00075
00076 ~Barrier()
00077 {
00078 wakeAll();
00079 }
00080
00081
00082 void signal()
00083 {
00084 boost::mutex::scoped_lock scopedBarrier(sleeper_mutex);
00085 wakeAll();
00086 }
00087
00088 drizzled::session_id_t getOwner() const
00089 {
00090 return owner;
00091 }
00092
00093 void wait()
00094 {
00095 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00096 int64_t my_generation= generation;
00097
00098 --current_wait;
00099 if (limit)
00100 {
00101 if (not current_wait)
00102 {
00103 wakeAll();
00104
00105 return;
00106 }
00107
00108 }
00109 checkObservers();
00110
00111
00112
00113 try
00114 {
00115 while (my_generation == generation)
00116 {
00117 sleep_threshhold.wait(sleeper_mutex);
00118 }
00119 }
00120 catch(boost::thread_interrupted const& error)
00121 {
00122 current_wait++;
00123 checkObservers();
00124 }
00125 }
00126
00127
00128 void wait_until(int64_t wait_until_arg)
00129 {
00130 Observer::shared_ptr observer;
00131 {
00132 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00133
00134 if (wait_until_arg <= count())
00135 return;
00136
00137 observer.reset(new Observer(wait_until_arg));
00138 observers.push_back(observer);
00139 }
00140
00141 try {
00142 observer->sleep();
00143 }
00144 catch(boost::thread_interrupted const& error)
00145 {
00146 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00147
00148
00149
00150 observers.remove(observer);
00151
00152 throw error;
00153 }
00154 }
00155
00156 void wait(int64_t generation_arg)
00157 {
00158 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00159 int64_t my_generation= generation;
00160
00161
00162 if (my_generation > generation_arg)
00163 return;
00164
00165 --current_wait;
00166
00167 if (limit)
00168 {
00169 if (not current_wait)
00170 {
00171 wakeAll();
00172 return;
00173 }
00174
00175 }
00176
00177 while (my_generation == generation)
00178 {
00179 sleep_threshhold.wait(sleeper_mutex);
00180 }
00181 }
00182
00183 int64_t getGeneration()
00184 {
00185 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00186 return generation;
00187 }
00188
00189 int64_t sizeObservers()
00190 {
00191 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00192 return static_cast<int64_t>(observers.size());
00193 }
00194
00195 int64_t sizeWaiters()
00196 {
00197 boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00198 return count();
00199 }
00200
00201 int64_t getLimit() const
00202 {
00203 return limit;
00204 }
00205
00206 private:
00207 void wakeAll()
00208 {
00209 generation++;
00210 current_wait= limit;
00211 sleep_threshhold.notify_all();
00212
00213 checkObservers();
00214 }
00215
00216 struct isReady : public std::unary_function<Observer::list::const_reference, bool>
00217 {
00218 const int64_t count;
00219
00220 isReady(int64_t arg) :
00221 count(arg)
00222 { }
00223
00224 result_type operator() (argument_type observer)
00225 {
00226 if (observer->getLimit() <= count or count == 0)
00227 {
00228 observer->wake();
00229 return true;
00230 }
00231
00232 return false;
00233 }
00234 };
00235
00236 void checkObservers()
00237 {
00238 observers.remove_if(isReady(count()));
00239 }
00240
00241 int64_t count() const
00242 {
00243 if (limit)
00244 {
00245 return limit - current_wait;
00246 }
00247 return std::abs(static_cast<long int>(current_wait));
00248 }
00249
00250
00251 drizzled::session_id_t owner;
00252
00253 const int64_t limit;
00254 int64_t current_wait;
00255 int64_t generation;
00256
00257 Observer::list observers;
00258
00259 boost::mutex sleeper_mutex;
00260 boost::condition_variable_any sleep_threshhold;
00261
00262 };
00263
00264 }
00265 }
00266