sockagg.h

Go to the documentation of this file.
00001 /*
00002  * sockagg.h
00003  *
00004  * Generalised Socket Aggregation functions
00005  *
00006  * Portable Windows Library
00007  *
00008  * Copyright (C) 2005 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Revision: 21788 $
00030  * $Author: rjongbloed $
00031  * $Date: 2008-12-12 05:42:13 +0000 (Fri, 12 Dec 2008) $
00032  */
00033 
00034 
00035 #ifndef PTLIB_SOCKAGG_H
00036 #define PTLIB_SOCKAGG_H
00037 
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041 
00042 #include <ptlib.h>
00043 #include <ptlib/sockets.h>
00044 
00045 #include <list>
00046 #include <map>
00047 
00048 
00049 /*
00050  *  These classes and templates implement a generic thread pooling mechanism
00051  */
00052 
00053 class PThreadPoolBase;
00054 
00055 class PThreadPoolWorkerBase : public PThread
00056 {
00057   public:
00058     PThreadPoolWorkerBase(PThreadPoolBase & threadPool);
00059 
00060     virtual unsigned GetWorkSize() const = 0;
00061     virtual void Shutdown() = 0;
00062 
00063     //virtual void OnAddWork(work_base *);
00064     //virtual void OnRemoveWork(work_base *);
00065 
00066     PThreadPoolBase & pool;
00067     PBoolean shutdown;
00068     PMutex workerMutex;
00069 };
00070 
00071 class PThreadPoolBase : public PObject
00072 {
00073   public:
00074     PThreadPoolBase(unsigned maximum = 10);
00075     ~PThreadPoolBase();
00076 
00077     virtual PThreadPoolWorkerBase * CreateWorkerThread() = 0;
00078 
00079     virtual PThreadPoolWorkerBase * AllocateWorker();
00080 
00081   protected:
00082     virtual bool CheckWorker(PThreadPoolWorkerBase * worker);
00083     void StopWorker(PThreadPoolWorkerBase * worker);
00084     PMutex listMutex;
00085     typedef std::vector<PThreadPoolWorkerBase *> WorkerList_t;
00086     WorkerList_t workers;
00087 
00088     unsigned maxWorkerSize;
00089 };
00090 
00091 template <class WorkUnit_T, class WorkerThread_T>
00092 class PThreadPool : public PThreadPoolBase
00093 {
00094   PCLASSINFO(PThreadPool, PThreadPoolBase);
00095   public:
00096     typedef typename std::map<WorkUnit_T *, WorkerThread_T *> WorkUnitMap_T;
00097 
00098     PThreadPool(unsigned maximum = 10)
00099       : PThreadPoolBase(maximum) { }
00100 
00101     virtual PThreadPoolWorkerBase * CreateWorkerThread()
00102     { return new WorkerThread_T(*this); }
00103 
00104     bool AddWork(WorkUnit_T * workUnit)
00105     {
00106       PWaitAndSignal m(listMutex);
00107 
00108       PThreadPoolWorkerBase * a_worker = AllocateWorker();
00109       if (a_worker == NULL)
00110         return false;
00111 
00112       WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(a_worker);
00113       workUnitMap.insert(typename WorkUnitMap_T::value_type(workUnit, worker));
00114 
00115       worker->OnAddWork(workUnit);
00116 
00117       return true;
00118     }
00119 
00120     bool RemoveWork(WorkUnit_T * workUnit)
00121     {
00122       PWaitAndSignal m(listMutex);
00123 
00124       // find worker with work unit to remove
00125       typename WorkUnitMap_T::iterator r = workUnitMap.find(workUnit);
00126       if (r == workUnitMap.end())
00127         return false;
00128 
00129       WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(r->second);
00130 
00131       workUnitMap.erase(r);
00132 
00133       worker->OnRemoveWork(workUnit);
00134 
00135       CheckWorker(worker);
00136 
00137       return true;
00138     }
00139 
00140   protected:
00141     WorkUnitMap_T workUnitMap;
00142 };
00143 
00144 #if 0 
00145 
00146 // aggregator code disabled pending reimplementation
00147 
00149 
00150 /*
00151 
00152 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is
00153 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323
00154 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be
00155 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at:
00156 
00157          http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf
00158 
00159 There are two assumptions made by this code:
00160 
00161   1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other
00162      polling mechanism is less efficient
00163 
00164   2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of
00165      other calls that are handled in the same thread
00166 
00167 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks
00168 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 
00169 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect
00170 received data into a buffer until the handling routine decides that a full PDU has been received.
00171 
00172 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed
00173 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can
00174 be woken up in order to allow the addition or removal of sockets to the list
00175 
00176 */
00177 
00178 #include <ptlib.h>
00179 #include <functional>
00180 #include <vector>
00181 
00183 //
00184 // this class encapsulates the system specific handle needed to specifiy a socket.
00185 // On Unix systems, this is a simple file handle. This file handle is used to uniquely
00186 // identify the socket and used in the "select" system call
00187 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT
00188 // handle is needed for the WSWaitForMultpleEvents call.
00189 // This is further complicated by the fact that we need to treat some pairs of sockets as a single
00190 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code.
00191 //
00192 
00193 class PAggregatedHandle;
00194 
00195 class PAggregatorFD 
00196 {
00197   public:
00198 #ifdef _WIN32
00199     typedef WSAEVENT FD;
00200     typedef SOCKET FDType;
00201     SOCKET socket;
00202 #else
00203     typedef int FD;
00204     typedef int FDType;
00205 #endif
00206 
00207     PAggregatorFD(FDType fd);
00208 
00209     FD fd;
00210 
00211     ~PAggregatorFD();
00212     bool IsValid();
00213 };
00214 
00215 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00216 
00218 //
00219 // This class defines an abstract class used to define a handle that can be aggregated
00220 //
00221 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets
00222 // which greatly complicates things
00223 //
00224 
00225 #ifdef _MSC_VER
00226 #pragma warning(push)
00227 #pragma warning(disable:4127)
00228 #endif
00229 
00230 class PAggregatedHandle : public PObject
00231 {
00232   PCLASSINFO(PAggregatedHandle, PObject);
00233   public:
00234     PAggregatedHandle(PBoolean auto = false)
00235       : autoDelete(auto)
00236       , closed(false)
00237       , beingProcessed(false)
00238       , preReadDone(false)
00239     { }
00240 
00241     virtual PAggregatorFDList_t GetFDs() = 0;
00242 
00243     virtual PTimeInterval GetTimeout()
00244     { return PMaxTimeInterval; }
00245 
00246     virtual PBoolean Init()      { return PTrue; }
00247     virtual PBoolean PreRead()   { return PTrue; }
00248     virtual PBoolean OnRead() = 0;
00249     virtual void DeInit()    { }
00250     virtual void OnClose()   { }
00251 
00252     virtual PBoolean IsPreReadDone() const
00253     { return preReadDone; }
00254 
00255     virtual void SetPreReadDone(PBoolean v = PTrue)
00256     { preReadDone = v; }
00257 
00258     PBoolean autoDelete;
00259     PBoolean closed;
00260     PBoolean beingProcessed;
00261 
00262   protected:
00263     PBoolean preReadDone;
00264 };
00265 
00266 #ifdef _MSC_VER
00267 #pragma warning(pop)
00268 #endif
00269 
00270 
00272 //
00273 // This class is the actual socket aggregator
00274 //
00275 
00276 #ifdef _WIN32
00277 
00278 class EventBase
00279 {
00280   public:
00281     EventBase()
00282     { 
00283       event = ::CreateEvent(NULL, PTrue, PFalse,NULL); 
00284       PAssert(event != NULL, "CreateEvent failed");
00285     }
00286 
00287     ~EventBase()
00288     { CloseHandle(event); }
00289 
00290     PAggregatorFD::FD GetHandle()
00291     { return (PAggregatorFD::FD)event; }
00292 
00293     void Set()
00294     { SetEvent(event);  }
00295 
00296     void Reset()
00297     { ResetEvent(event); }
00298 
00299   protected:
00300     HANDLE event;
00301 };
00302 
00303 #endif
00304 
00305 typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00306 
00307 class PAggregatorWorker : public PThreadPoolWorkerBase
00308 {
00309   public:
00310     PAggregatorWorker(PThreadPoolBase & pool);
00311 
00312     unsigned GetWorkSize() const;
00313     void Shutdown();
00314 
00315     void OnAddWork(PAggregatedHandle *);
00316     void OnRemoveWork(PAggregatedHandle *);
00317 
00318     void Main();
00319     PAggregatedHandleList_t handleList;
00320 
00321     void Trigger()  { localEvent.Set(); }
00322     
00323     PBoolean listChanged;
00324 };
00325 
00326 typedef PThreadPool<PAggregatedHandle, PAggregatorWorker> PHandleAggregatorBase;
00327 
00328 class PHandleAggregator : public PHandleAggregatorBase
00329 {
00330   PCLASSINFO(PHandleAggregator, PHandleAggregatorBase)
00331   public:
00332     typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00333 
00334     PHandleAggregator(unsigned maximum = 10);
00335 
00336     PBoolean AddHandle(PAggregatedHandle * handle);
00337 
00338     PBoolean RemoveHandle(PAggregatedHandle * handle);
00339 };
00340 
00341 
00343 //
00344 // This template class allows the creation of aggregators for sockets that are
00345 // descendants of PIPSocket
00346 //
00347 
00348 #if 0
00349 
00350 template <class PSocketType>
00351 class PSocketAggregator : public PHandleAggregator
00352 {
00353   PCLASSINFO(PSocketAggregator, PHandleAggregator)
00354   public:
00355     class AggregatedPSocket : public PAggregatedHandle
00356     {
00357       public:
00358         AggregatedPSocket(PSocketType * s)
00359           : psocket(s), fd(s->GetHandle()) { }
00360 
00361         PBoolean OnRead()
00362         { return psocket->OnRead(); }
00363 
00364         PAggregatorFDList_t GetFDs()
00365         { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00366 
00367       protected:
00368         PSocketType * psocket;
00369         PAggregatorFD fd;
00370     };
00371 
00372     typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00373     SocketList_t socketList;
00374 
00375     PBoolean AddSocket(PSocketType * sock)
00376     { 
00377       PWaitAndSignal m(listMutex);
00378 
00379       AggregatedPSocket * handle = new AggregatedPSocket(sock);
00380       if (AddHandle(handle)) {
00381         socketList.insert(SocketList_t::value_type(sock, handle));
00382         return true;
00383       }
00384 
00385       delete handle;
00386       return false;
00387     }
00388 
00389     PBoolean RemoveSocket(PSocketType * sock)
00390     { 
00391       PWaitAndSignal m(listMutex);
00392 
00393       typename SocketList_t::iterator r = socketList.find(sock);
00394       if (r == socketList.end()) 
00395         return PFalse;
00396 
00397       AggregatedPSocket * handle = r->second;
00398       RemoveHandle(handle);
00399       delete handle;
00400       socketList.erase(r);
00401       return PTrue;
00402     }
00403 };
00404 #endif  // #if 0
00405 
00406 #endif
00407 
00408 // aggregator code disabled pending reimplementation
00409 
00410 #endif // PTLIB_SOCKAGG_H
00411 
00412 
00413 // End Of File ///////////////////////////////////////////////////////////////

Generated on Fri Apr 10 22:33:49 2009 for PTLib by  doxygen 1.5.8