sockagg.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef PTLIB_SOCKAGG_H
00036 #define PTLIB_SOCKAGG_H
00037
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041
00042 #include <ptlib.h>
00043 #include <ptlib/sockets.h>
00044
00045 #include <list>
00046 #include <map>
00047
00048
00049
00050
00051
00052
00053 class PThreadPoolBase;
00054
00055 class PThreadPoolWorkerBase : public PThread
00056 {
00057 public:
00058 PThreadPoolWorkerBase(PThreadPoolBase & threadPool);
00059
00060 virtual unsigned GetWorkSize() const = 0;
00061 virtual void Shutdown() = 0;
00062
00063
00064
00065
00066 PThreadPoolBase & pool;
00067 PBoolean shutdown;
00068 PMutex workerMutex;
00069 };
00070
00071 class PThreadPoolBase : public PObject
00072 {
00073 public:
00074 PThreadPoolBase(unsigned maximum = 10);
00075 ~PThreadPoolBase();
00076
00077 virtual PThreadPoolWorkerBase * CreateWorkerThread() = 0;
00078
00079 virtual PThreadPoolWorkerBase * AllocateWorker();
00080
00081 protected:
00082 virtual bool CheckWorker(PThreadPoolWorkerBase * worker);
00083 void StopWorker(PThreadPoolWorkerBase * worker);
00084 PMutex listMutex;
00085 typedef std::vector<PThreadPoolWorkerBase *> WorkerList_t;
00086 WorkerList_t workers;
00087
00088 unsigned maxWorkerSize;
00089 };
00090
00091 template <class WorkUnit_T, class WorkerThread_T>
00092 class PThreadPool : public PThreadPoolBase
00093 {
00094 PCLASSINFO(PThreadPool, PThreadPoolBase);
00095 public:
00096 typedef typename std::map<WorkUnit_T *, WorkerThread_T *> WorkUnitMap_T;
00097
00098 PThreadPool(unsigned maximum = 10)
00099 : PThreadPoolBase(maximum) { }
00100
00101 virtual PThreadPoolWorkerBase * CreateWorkerThread()
00102 { return new WorkerThread_T(*this); }
00103
00104 bool AddWork(WorkUnit_T * workUnit)
00105 {
00106 PWaitAndSignal m(listMutex);
00107
00108 PThreadPoolWorkerBase * a_worker = AllocateWorker();
00109 if (a_worker == NULL)
00110 return false;
00111
00112 WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(a_worker);
00113 workUnitMap.insert(typename WorkUnitMap_T::value_type(workUnit, worker));
00114
00115 worker->OnAddWork(workUnit);
00116
00117 return true;
00118 }
00119
00120 bool RemoveWork(WorkUnit_T * workUnit)
00121 {
00122 PWaitAndSignal m(listMutex);
00123
00124
00125 typename WorkUnitMap_T::iterator r = workUnitMap.find(workUnit);
00126 if (r == workUnitMap.end())
00127 return false;
00128
00129 WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(r->second);
00130
00131 workUnitMap.erase(r);
00132
00133 worker->OnRemoveWork(workUnit);
00134
00135 CheckWorker(worker);
00136
00137 return true;
00138 }
00139
00140 protected:
00141 WorkUnitMap_T workUnitMap;
00142 };
00143
00144 #if 0
00145
00146
00147
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 #include <ptlib.h>
00179 #include <functional>
00180 #include <vector>
00181
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193 class PAggregatedHandle;
00194
00195 class PAggregatorFD
00196 {
00197 public:
00198 #ifdef _WIN32
00199 typedef WSAEVENT FD;
00200 typedef SOCKET FDType;
00201 SOCKET socket;
00202 #else
00203 typedef int FD;
00204 typedef int FDType;
00205 #endif
00206
00207 PAggregatorFD(FDType fd);
00208
00209 FD fd;
00210
00211 ~PAggregatorFD();
00212 bool IsValid();
00213 };
00214
00215 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00216
00218
00219
00220
00221
00222
00223
00224
00225 #ifdef _MSC_VER
00226 #pragma warning(push)
00227 #pragma warning(disable:4127)
00228 #endif
00229
00230 class PAggregatedHandle : public PObject
00231 {
00232 PCLASSINFO(PAggregatedHandle, PObject);
00233 public:
00234 PAggregatedHandle(PBoolean auto = false)
00235 : autoDelete(auto)
00236 , closed(false)
00237 , beingProcessed(false)
00238 , preReadDone(false)
00239 { }
00240
00241 virtual PAggregatorFDList_t GetFDs() = 0;
00242
00243 virtual PTimeInterval GetTimeout()
00244 { return PMaxTimeInterval; }
00245
00246 virtual PBoolean Init() { return PTrue; }
00247 virtual PBoolean PreRead() { return PTrue; }
00248 virtual PBoolean OnRead() = 0;
00249 virtual void DeInit() { }
00250 virtual void OnClose() { }
00251
00252 virtual PBoolean IsPreReadDone() const
00253 { return preReadDone; }
00254
00255 virtual void SetPreReadDone(PBoolean v = PTrue)
00256 { preReadDone = v; }
00257
00258 PBoolean autoDelete;
00259 PBoolean closed;
00260 PBoolean beingProcessed;
00261
00262 protected:
00263 PBoolean preReadDone;
00264 };
00265
00266 #ifdef _MSC_VER
00267 #pragma warning(pop)
00268 #endif
00269
00270
00272
00273
00274
00275
00276 #ifdef _WIN32
00277
00278 class EventBase
00279 {
00280 public:
00281 EventBase()
00282 {
00283 event = ::CreateEvent(NULL, PTrue, PFalse,NULL);
00284 PAssert(event != NULL, "CreateEvent failed");
00285 }
00286
00287 ~EventBase()
00288 { CloseHandle(event); }
00289
00290 PAggregatorFD::FD GetHandle()
00291 { return (PAggregatorFD::FD)event; }
00292
00293 void Set()
00294 { SetEvent(event); }
00295
00296 void Reset()
00297 { ResetEvent(event); }
00298
00299 protected:
00300 HANDLE event;
00301 };
00302
00303 #endif
00304
00305 typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00306
00307 class PAggregatorWorker : public PThreadPoolWorkerBase
00308 {
00309 public:
00310 PAggregatorWorker(PThreadPoolBase & pool);
00311
00312 unsigned GetWorkSize() const;
00313 void Shutdown();
00314
00315 void OnAddWork(PAggregatedHandle *);
00316 void OnRemoveWork(PAggregatedHandle *);
00317
00318 void Main();
00319 PAggregatedHandleList_t handleList;
00320
00321 void Trigger() { localEvent.Set(); }
00322
00323 PBoolean listChanged;
00324 };
00325
00326 typedef PThreadPool<PAggregatedHandle, PAggregatorWorker> PHandleAggregatorBase;
00327
00328 class PHandleAggregator : public PHandleAggregatorBase
00329 {
00330 PCLASSINFO(PHandleAggregator, PHandleAggregatorBase)
00331 public:
00332 typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00333
00334 PHandleAggregator(unsigned maximum = 10);
00335
00336 PBoolean AddHandle(PAggregatedHandle * handle);
00337
00338 PBoolean RemoveHandle(PAggregatedHandle * handle);
00339 };
00340
00341
00343
00344
00345
00346
00347
00348 #if 0
00349
00350 template <class PSocketType>
00351 class PSocketAggregator : public PHandleAggregator
00352 {
00353 PCLASSINFO(PSocketAggregator, PHandleAggregator)
00354 public:
00355 class AggregatedPSocket : public PAggregatedHandle
00356 {
00357 public:
00358 AggregatedPSocket(PSocketType * s)
00359 : psocket(s), fd(s->GetHandle()) { }
00360
00361 PBoolean OnRead()
00362 { return psocket->OnRead(); }
00363
00364 PAggregatorFDList_t GetFDs()
00365 { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00366
00367 protected:
00368 PSocketType * psocket;
00369 PAggregatorFD fd;
00370 };
00371
00372 typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00373 SocketList_t socketList;
00374
00375 PBoolean AddSocket(PSocketType * sock)
00376 {
00377 PWaitAndSignal m(listMutex);
00378
00379 AggregatedPSocket * handle = new AggregatedPSocket(sock);
00380 if (AddHandle(handle)) {
00381 socketList.insert(SocketList_t::value_type(sock, handle));
00382 return true;
00383 }
00384
00385 delete handle;
00386 return false;
00387 }
00388
00389 PBoolean RemoveSocket(PSocketType * sock)
00390 {
00391 PWaitAndSignal m(listMutex);
00392
00393 typename SocketList_t::iterator r = socketList.find(sock);
00394 if (r == socketList.end())
00395 return PFalse;
00396
00397 AggregatedPSocket * handle = r->second;
00398 RemoveHandle(handle);
00399 delete handle;
00400 socketList.erase(r);
00401 return PTrue;
00402 }
00403 };
00404 #endif // #if 0
00405
00406 #endif
00407
00408
00409
00410 #endif // PTLIB_SOCKAGG_H
00411
00412
00413