Drizzled Public API Documentation

barrier.h

00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  * Copyright (C) 2010 Brian Aker
00005  * All rights reserved.
00006  *
00007  * Redistribution and use in source and binary forms, with or without
00008  * modification, are permitted provided that the following conditions are met:
00009  *     * Redistributions of source code must retain the above copyright
00010  *       notice, this list of conditions and the following disclaimer.
00011  *     * Redistributions in binary form must reproduce the above copyright
00012  *       notice, this list of conditions and the following disclaimer in the
00013  *       documentation and/or other materials provided with the distribution.
00014  *     * Neither the name of the <organization> nor the
00015  *       names of its contributors may be used to endorse or promote products
00016  *       derived from this software without specific prior written permission.
00017  *
00018  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00019  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00020  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00021  * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
00022  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00023  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00024  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00025  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00026  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00027  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00028  *
00029  */
00030 
00031 #include <boost/thread/mutex.hpp>
00032 #include <boost/thread/condition_variable.hpp>
00033 #include <boost/shared_ptr.hpp>
00034 #include <boost/foreach.hpp>
00035 
00036 #include "observer.h"
00037 
00038 #pragma once
00039 
00040 /*
00041   Barrier was designed with the following concepts.
00042 
00043   1) A barrier can be set with an initial limit which can be used such that if the limit is met, it releases all waiters.
00044   2) A barrier can be released at any time, even if the limit is not met by an outside caller.
00045   3) An observer can register itself to the barrier, it will wait until some predicate action releases it.
00046   4) Observers are always released by limit, or in the case where the barrier is released or destroyed.
00047   5) Observers should be held by copy, not by reference in order to allow for correct deletion.
00048 
00049   @todo while we do pass an owner type to a barrier currently, we make no usage of it, and instead we currently protect
00050   poor use, namely the owner of a barrier calling wait() via the layer above. It may be a good idea to change this.
00051 */
00052 
00053 namespace user_locks {
00054 namespace barriers {
00055 
00056 // Barrier starts in a blocking posistion
00057 class Barrier {
00058 public:
00059   typedef boost::shared_ptr<Barrier> shared_ptr;
00060 
00061   Barrier(drizzled::session_id_t owner_arg) :
00062     owner(owner_arg),
00063     limit(0),
00064     current_wait(0),
00065     generation(0)
00066   { }
00067 
00068   Barrier(drizzled::session_id_t owner_arg, int64_t limit_arg) :
00069     owner(owner_arg),
00070     limit(limit_arg),
00071     current_wait(limit),
00072     generation(0)
00073   {
00074   }
00075 
00076   ~Barrier()
00077   {
00078     wakeAll();
00079   }
00080 
00081   // Signal all of the observers to start
00082   void signal()
00083   {
00084     boost::mutex::scoped_lock scopedBarrier(sleeper_mutex);
00085     wakeAll();
00086   }
00087 
00088   drizzled::session_id_t getOwner() const
00089   {
00090     return owner;
00091   }
00092 
00093   void wait()
00094   {
00095     boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00096     int64_t my_generation= generation;
00097 
00098     --current_wait;
00099     if (limit)
00100     {
00101       if (not current_wait)
00102       {
00103         wakeAll();
00104 
00105         return;
00106       }
00107 
00108     }
00109     checkObservers();
00110 
00111     // If we are interrupted we remove ourself from the list, and check on
00112     // the observers.
00113     try 
00114     {
00115       while (my_generation == generation)
00116       {
00117         sleep_threshhold.wait(sleeper_mutex);
00118       }
00119     }
00120     catch(boost::thread_interrupted const& error)
00121     {
00122       current_wait++;
00123       checkObservers();
00124     }
00125   }
00126 
00127   // A call to either signal or a release will cause wait_for() to continue
00128   void wait_until(int64_t wait_until_arg)
00129   {
00130     Observer::shared_ptr observer;
00131     {
00132       boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00133 
00134       if (wait_until_arg <= count())
00135         return;
00136 
00137       observer.reset(new Observer(wait_until_arg));
00138       observers.push_back(observer);
00139     }
00140 
00141     try {
00142       observer->sleep();
00143     }
00144     catch(boost::thread_interrupted const& error)
00145     {
00146       boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00147       // Someone has interrupted us, we now try to remove ourself from the
00148       // observer chain ourself
00149 
00150       observers.remove(observer);
00151       
00152       throw error;
00153     }
00154   }
00155 
00156   void wait(int64_t generation_arg)
00157   {
00158     boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00159     int64_t my_generation= generation;
00160 
00161     // If the generation is newer  then we just return immediatly
00162     if (my_generation > generation_arg)
00163       return;
00164 
00165     --current_wait;
00166 
00167     if (limit)
00168     {
00169       if (not current_wait)
00170       {
00171         wakeAll();
00172         return;
00173       }
00174 
00175     }
00176 
00177     while (my_generation == generation)
00178     {
00179       sleep_threshhold.wait(sleeper_mutex);
00180     }
00181   }
00182 
00183   int64_t getGeneration()
00184   {
00185     boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00186     return generation;
00187   }
00188 
00189   int64_t sizeObservers()
00190   {
00191     boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00192     return static_cast<int64_t>(observers.size());
00193   }
00194 
00195   int64_t sizeWaiters()
00196   {
00197     boost::mutex::scoped_lock scopedLock(sleeper_mutex);
00198     return count();
00199   }
00200 
00201   int64_t getLimit() const
00202   {
00203     return limit;
00204   }
00205 
00206 private:
00207   void wakeAll()
00208   {
00209     generation++;
00210     current_wait= limit;
00211     sleep_threshhold.notify_all();
00212 
00213     checkObservers();
00214   }
00215 
00216   struct isReady : public std::unary_function<Observer::list::const_reference, bool>
00217   {
00218     const int64_t count;
00219 
00220     isReady(int64_t arg) :
00221       count(arg)
00222     { }
00223 
00224     result_type operator() (argument_type observer)
00225     {
00226       if (observer->getLimit() <= count or count == 0)
00227       {
00228         observer->wake();
00229         return true;
00230       }
00231 
00232       return false;
00233     }
00234   };
00235 
00236   void checkObservers()
00237   {
00238     observers.remove_if(isReady(count()));
00239   }
00240 
00241   int64_t count() const
00242   {
00243     if (limit)
00244     {
00245       return limit - current_wait;
00246     }
00247     return std::abs(static_cast<long int>(current_wait));
00248   }
00249 
00250 
00251   drizzled::session_id_t owner;
00252 
00253   const int64_t limit;
00254   int64_t current_wait;
00255   int64_t generation;
00256 
00257   Observer::list observers;
00258 
00259   boost::mutex sleeper_mutex;
00260   boost::condition_variable_any sleep_threshhold;
00261 
00262 };
00263 
00264 } // namespace barriers
00265 } // namespace user_locks
00266