00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "CSConfig.h"
00030
00031 #ifdef OS_WINDOWS
00032 #include <signal.h>
00033
00034 #define SIGUSR1 30
00035 #define SIGUSR2 31
00036
00037 #else
00038 #include <signal.h>
00039 #include <sys/signal.h>
00040 #include <unistd.h>
00041 #endif
00042 #include <errno.h>
00043
00044 #include "CSGlobal.h"
00045 #include "CSLog.h"
00046 #include "CSException.h"
00047 #include "CSThread.h"
00048 #include "CSStrUtil.h"
00049 #include "CSMemory.h"
00050
00051 #define PBMS_THREAD_SIG SIGUSR1
00052
00053
00054
00055
00056
00057 extern "C" {
00058
00059
00060 static void td_catch_signal(int sig)
00061 {
00062 CSThread *self;
00063
00064 if ((self = CSThread::getSelf())) {
00065 if (self->isMain()) {
00066
00067 if (self->myThreadList)
00068 self->myThreadList->signalAllThreads(sig);
00069 self->setSignalPending(sig);
00070 }
00071 }
00072
00073 }
00074
00075 static void td_throw_signal(int sig)
00076 {
00077 CSThread *self;
00078
00079 if ((self = CSThread::getSelf())) {
00080 if (self->isMain()) {
00081
00082 if (self->myThreadList)
00083 self->myThreadList->signalAllThreads(sig);
00084 }
00085 self->setSignalPending(sig);
00086 self->interrupted();
00087 }
00088 }
00089
00090 static bool td_setup_signals(CSThread *thread)
00091 {
00092 #ifdef OS_WINDOWS
00093 return true;
00094 #else
00095 struct sigaction action;
00096
00097 sigemptyset(&action.sa_mask);
00098 action.sa_flags = 0;
00099
00100 action.sa_handler = td_catch_signal;
00101
00102 if (sigaction(PBMS_THREAD_SIG, &action, NULL) == -1)
00103 goto error_occurred;
00104
00105 action.sa_handler = td_throw_signal;
00106
00107 return true;
00108
00109 error_occurred:
00110
00111 if (thread) {
00112 thread->myException.initOSError(CS_CONTEXT, errno);
00113 thread->myException.setStackTrace(thread);
00114 }
00115 else
00116 CSException::throwOSError(CS_CONTEXT, errno);
00117 return false;
00118 #endif
00119 }
00120
00121 }
00122
00123
00124
00125
00126
00127
00128 void CSThreadList::signalAllThreads(int sig)
00129 {
00130 CSThread *ptr;
00131
00132 enter_();
00133 lock_(this);
00134 ptr = (CSThread *) getBack();
00135 while (ptr) {
00136 if (ptr != self)
00137 ptr->signal(sig);
00138 ptr = (CSThread *) ptr->getNextLink();
00139 }
00140 unlock_(this);
00141
00142 exit_();
00143 }
00144
00145 void CSThreadList::quitAllThreads()
00146 {
00147 CSThread *ptr;
00148
00149 enter_();
00150 lock_(this);
00151
00152 ptr = (CSThread *) getBack();
00153 while (ptr) {
00154 if (ptr != self)
00155 ptr->myMustQuit = true;
00156 ptr = (CSThread *) ptr->getNextLink();
00157 }
00158
00159 unlock_(this);
00160 exit_();
00161 }
00162
00163 void CSThreadList::stopAllThreads()
00164 {
00165 CSThread *thread;
00166
00167 enter_();
00168 for (;;) {
00169
00170 lock_(this);
00171 if ((thread = (CSThread *) getBack())) {
00172 while (thread) {
00173 if (thread != self)
00174 break;
00175 thread = (CSThread *) thread->getNextLink();
00176 }
00177 }
00178 if (thread)
00179 thread->retain();
00180 unlock_(this);
00181
00182 if (!thread)
00183 break;
00184
00185 push_(thread);
00186 thread->stop();
00187 release_(thread);
00188 }
00189 exit_();
00190 }
00191
00192
00193
00194
00195
00196
00197 void CSThread::addToList()
00198 {
00199 if (myThreadList) {
00200 enter_();
00201 ASSERT(self == this);
00202 lock_(myThreadList);
00203 myThreadList->addFront(self);
00204 isRunning = true;
00205 unlock_(myThreadList);
00206 exit_();
00207 }
00208 else
00209 isRunning = true;
00210 }
00211
00212 void CSThread::removeFromList()
00213 {
00214 if (myThreadList && isRunning) {
00215 CSThread *myself = this;
00216 enter_();
00217
00218
00219
00220
00221
00222
00223 push_(myself);
00224 lock_(myThreadList);
00225 myThreadList->remove(RETAIN(myself));
00226 unlock_(myThreadList);
00227 pop_(myself);
00228 outer_();
00229 }
00230 this->release();
00231 }
00232
00233 void *CSThread::dispatch(void *arg)
00234 {
00235 CSThread *self;
00236 void *return_data = NULL;
00237 int err;
00238
00239
00240 self = reinterpret_cast<CSThread*>(arg);
00241 ASSERT(self);
00242
00243
00244 if ((err = pthread_setspecific(CSThread::sThreadKey, self))) {
00245 CSException::logOSError(self, CS_CONTEXT, err);
00246 return NULL;
00247 }
00248
00249
00250
00251
00252
00253 self->retain();
00254
00255 try_(a) {
00256 td_setup_signals(NULL);
00257
00258
00259 self->addToList();
00260
00261
00262 return_data = self->run();
00263 }
00264 catch_(a) {
00265 self->logException();
00266 }
00267 cont_(a);
00268
00269
00270
00271
00272 self->removeFromList();
00273
00274
00275 return return_data;
00276 }
00277
00278
00279 extern "C"
00280 {
00281
00282 static void *dispatch_wrapper(void *arg)
00283 {
00284 return CSThread::dispatch(arg);
00285 }
00286
00287 }
00288
00289 void *CSThread::run()
00290 {
00291 if (iRunFunc)
00292 return iRunFunc();
00293 return NULL;
00294 }
00295
00296 void CSThread::start(bool detached)
00297 {
00298 int err;
00299
00300 err = pthread_create(&iThread, NULL, dispatch_wrapper, (void *) this);
00301 if (err)
00302 CSException::throwOSError(CS_CONTEXT, err);
00303 while (!isRunning) {
00304
00305
00306
00307 if (pthread_kill(iThread, 0))
00308 break;
00309 usleep(10);
00310 }
00311
00312 isDetached = detached;
00313 if (detached)
00314 pthread_detach(iThread);
00315 }
00316
00317 void CSThread::stop()
00318 {
00319 signal(SIGTERM);
00320 join();
00321 }
00322
00323 void *CSThread::join()
00324 {
00325 void *return_data = NULL;
00326 int err;
00327
00328 enter_();
00329 if (isDetached) {
00330 while (isRunning && !pthread_kill(iThread, 0))
00331 usleep(100);
00332 } else if ((err = pthread_join(iThread, &return_data))) {
00333 CSException::throwOSError(CS_CONTEXT, err);
00334 }
00335
00336 return_(return_data);
00337 }
00338
00339 void CSThread::setSignalPending(unsigned int sig)
00340 {
00341 if (sig == SIGTERM)
00342
00343 signalPending = SIGTERM;
00344 else if (!signalPending)
00345
00346 signalPending = sig;
00347 }
00348
00349 void CSThread::signal(unsigned int sig)
00350 {
00351 #ifndef OS_WINDOWS // Currently you cannot signal threads on windows.
00352 int err;
00353
00354 setSignalPending(sig);
00355 if ((err = pthread_kill(iThread, PBMS_THREAD_SIG)))
00356 {
00357
00358 if (err != ESRCH)
00359 CSException::throwOSError(CS_CONTEXT, err);
00360 }
00361 #endif
00362 }
00363
00364 void CSThread::throwSignal()
00365 {
00366 int sig;
00367
00368 if ((sig = signalPending) && !ignoreSignals) {
00369 signalPending = 0;
00370 CSException::throwSignal(CS_CONTEXT, sig);
00371 }
00372 }
00373
00374 bool CSThread::isMain()
00375 {
00376 return iIsMain;
00377 }
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 void CSThread::releaseObjects(CSReleasePtr top)
00389 {
00390 CSObject *obj;
00391
00392 while (relTop > top) {
00393
00394 relTop--;
00395 switch(relTop->r_type) {
00396 case CS_RELEASE_OBJECT:
00397 if ((obj = relTop->x.r_object))
00398 obj->release();
00399 break;
00400 case CS_RELEASE_MUTEX:
00401 if (relTop->x.r_mutex)
00402 relTop->x.r_mutex->unlock();
00403 break;
00404 case CS_RELEASE_POOLED:
00405 if (relTop->x.r_pooled)
00406 relTop->x.r_pooled->returnToPool();
00407 break;
00408 case CS_RELEASE_MEM:
00409 if (relTop->x.r_mem)
00410 cs_free(relTop->x.r_mem);
00411 break;
00412 case CS_RELEASE_OBJECT_PTR:
00413 if ((relTop->x.r_objectPtr) && (obj = *(relTop->x.r_objectPtr)))
00414 obj->release();
00415 break;
00416 }
00417 }
00418 }
00419
00420
00421 void CSThread::throwException()
00422 {
00423
00424 if (this->jumpDepth > 0 && this->jumpDepth <= CS_JUMP_STACK_SIZE) {
00425
00426
00427
00428
00429
00430
00431 releaseObjects(this->jumpEnv[this->jumpDepth-1].jb_res_top);
00432
00433
00434 longjmp(this->jumpEnv[this->jumpDepth-1].jb_buffer, 1);
00435 }
00436 }
00437
00438 void CSThread::logStack(int depth, const char *msg)
00439 {
00440 char buffer[CS_EXC_CONTEXT_SIZE +1];
00441 CSL.lock();
00442 CSL.log(this, CSLog::Trace, msg);
00443
00444 for (int i= callTop-1; i>=0 && depth; i--, depth--) {
00445 cs_format_context(CS_EXC_CONTEXT_SIZE, buffer,
00446 callStack[i].cs_func, callStack[i].cs_file, callStack[i].cs_line);
00447 strcat(buffer, "\n");
00448 CSL.log(this, CSLog::Trace, buffer);
00449 }
00450 CSL.unlock();
00451 }
00452
00453 void CSThread::logException()
00454 {
00455 myException.log(this);
00456 }
00457
00458
00459
00460
00461
00462
00463 void CSThread::caught()
00464 {
00465
00466 this->callTop = this->jumpEnv[this->jumpDepth].jb_call_top;
00467
00468
00469
00470
00471
00472 releaseObjects(this->jumpEnv[this->jumpDepth].jb_res_top);
00473 }
00474
00475
00476
00477
00478
00479
00480 pthread_key_t CSThread::sThreadKey;
00481 bool CSThread::isUp = false;
00482
00483 bool CSThread::startUp()
00484 {
00485 int err;
00486
00487 isUp = false;
00488 if ((err = pthread_key_create(&sThreadKey, NULL))) {
00489 CSException::logOSError(CS_CONTEXT, errno);
00490 } else
00491 isUp = true;
00492
00493 return isUp;
00494 }
00495
00496 void CSThread::shutDown()
00497 {
00498 isUp = false;
00499 }
00500
00501 bool CSThread::attach(CSThread *thread)
00502 {
00503 ASSERT(!getSelf());
00504
00505 if (!thread) {
00506 CSException::logOSError(CS_CONTEXT, ENOMEM);
00507 return false;
00508 }
00509
00510 if (!setSelf(thread))
00511 return false;
00512
00513
00514 if (!td_setup_signals(thread))
00515 return false;
00516
00517 thread->addToList();
00518 thread->retain();
00519 return true;
00520 }
00521
00522 void CSThread::detach(CSThread *thread)
00523 {
00524 ASSERT(!getSelf() || getSelf() == thread);
00525 thread->removeFromList();
00526 thread->release();
00527 pthread_setspecific(sThreadKey, NULL);
00528 }
00529
00530 CSThread* CSThread::getSelf()
00531 {
00532 CSThread* self = NULL;
00533
00534 if ((!isUp) || !(self = (CSThread*) pthread_getspecific(sThreadKey)))
00535 return (CSThread*) NULL;
00536
00537 #ifdef DEBUG
00538
00539
00540
00541
00542
00543
00544
00545
00546 #endif
00547
00548 return self;
00549 }
00550
00551 bool CSThread::setSelf(CSThread *self)
00552 {
00553 int err;
00554
00555 if (self) {
00556 self->iThread = pthread_self();
00557
00558
00559 if ((err = pthread_setspecific(sThreadKey, self))) {
00560 self->myException.initOSError(CS_CONTEXT, err);
00561 self->myException.setStackTrace(self);
00562 return false;
00563 }
00564 }
00565 else
00566 pthread_setspecific(sThreadKey, NULL);
00567 return true;
00568 }
00569
00570
00571 void CSThread::sleep(unsigned long timeout)
00572 {
00573 enter_();
00574 usleep(timeout * 1000);
00575 self->interrupted();
00576 exit_();
00577 }
00578
00579 #ifdef DEBUG
00580 int cs_assert(const char *func, const char *file, int line, const char *message)
00581 {
00582 CSException::throwAssertion(func, file, line, message);
00583 return 0;
00584 }
00585
00586 int cs_hope(const char *func, const char *file, int line, const char *message)
00587 {
00588 CSException e;
00589
00590 e.initAssertion(func, file, line, message);
00591 e.log(NULL);
00592 return 0;
00593 }
00594 #endif
00595
00596 CSThread *CSThread::newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list)
00597 {
00598 CSThread *thd;
00599
00600 enter_();
00601 if (!(thd = new CSThread(list))) {
00602 name->release();
00603 CSException::throwOSError(CS_CONTEXT, ENOMEM);
00604 }
00605 thd->threadName = name;
00606 thd->iRunFunc = run_func;
00607 return_(thd);
00608 }
00609
00610 CSThread *CSThread::newCSThread()
00611 {
00612 CSThread *thd = NULL;
00613
00614 if (!(thd = new CSThread(NULL))) {
00615 CSException::throwOSError(CS_CONTEXT, ENOMEM);
00616 }
00617
00618 return thd;
00619 }
00620
00621
00622
00623
00624
00625
00626 CSDaemon::CSDaemon(time_t wait_time, CSThreadList *list):
00627 CSThread(list),
00628 CSSync(),
00629 myWaitTime(wait_time),
00630 iSuspended(false),
00631 iSuspendCount(0)
00632 {
00633 }
00634
00635 CSDaemon::CSDaemon(CSThreadList *list):
00636 CSThread(list),
00637 CSSync(),
00638 myWaitTime(0),
00639 iSuspended(false),
00640 iSuspendCount(0)
00641 {
00642 }
00643
00644 void CSDaemon::try_Run(CSThread *self, const bool c_must_sleep)
00645 {
00646 try_(a) {
00647 bool must_sleep = c_must_sleep;
00648 while (!myMustQuit) {
00649 if (must_sleep) {
00650 lock_(this);
00651 if (myWaitTime)
00652 suspendedWait(myWaitTime);
00653 else
00654 suspendedWait();
00655 unlock_(this);
00656 if (myMustQuit)
00657 break;
00658 }
00659 must_sleep = doWork();
00660 }
00661 }
00662 catch_(a) {
00663 if (!handleException())
00664 myMustQuit = true;
00665 }
00666 cont_(a);
00667 }
00668
00669 void *CSDaemon::run()
00670 {
00671 bool must_sleep = false;
00672
00673 enter_();
00674
00675 myMustQuit = !initializeWork();
00676
00677 while (!myMustQuit) {
00678 try_Run(self, must_sleep);
00679 must_sleep = true;
00680 }
00681
00682
00683 ignoreSignals = true;
00684
00685 return_(completeWork());
00686 }
00687
00688 bool CSDaemon::doWork()
00689 {
00690 if (iRunFunc)
00691 (void) iRunFunc();
00692 return true;
00693 }
00694
00695 bool CSDaemon::handleException()
00696 {
00697 if (!myMustQuit)
00698 logException();
00699 return true;
00700 }
00701
00702 void CSDaemon::wakeup()
00703 {
00704 CSSync::wakeup();
00705 }
00706
00707 void CSDaemon::stop()
00708 {
00709 myMustQuit = true;
00710 wakeup();
00711 signal(SIGTERM);
00712 join();
00713 }
00714
00715 void CSDaemon::suspend()
00716 {
00717 enter_();
00718 lock_(this);
00719 iSuspendCount++;
00720 while (!iSuspended && !myMustQuit)
00721 wait(500);
00722 if (!iSuspended)
00723 iSuspendCount--;
00724 unlock_(this);
00725 exit_();
00726 }
00727
00728 void CSDaemon::resume()
00729 {
00730 enter_();
00731 lock_(this);
00732 if (iSuspendCount > 0)
00733 iSuspendCount--;
00734 wakeup();
00735 unlock_(this);
00736 exit_();
00737 }
00738
00739 void CSDaemon::suspended()
00740 {
00741 if (!iSuspendCount || myMustQuit) {
00742 iSuspended = false;
00743 return;
00744 }
00745 enter_();
00746 lock_(this);
00747 while (iSuspendCount && !myMustQuit) {
00748 iSuspended = true;
00749 wait(500);
00750 }
00751 iSuspended = false;
00752 unlock_(this);
00753 exit_();
00754 }
00755
00756 void CSDaemon::suspendedWait()
00757 {
00758 iSuspended = true;
00759 wait();
00760 if (iSuspendCount)
00761 suspended();
00762 }
00763
00764 void CSDaemon::suspendedWait(time_t milli_sec)
00765 {
00766 iSuspended = true;
00767 wait(milli_sec);
00768 if (iSuspendCount)
00769 suspended();
00770 else
00771 iSuspended = false;
00772 }
00773
00774
00775
00776
00777
00778
00779