Blender  V2.59
threads.c
Go to the documentation of this file.
00001 /*
00002  *
00003  * $Id: threads.c 35246 2011-02-27 20:37:56Z jesterking $
00004  *
00005  * ***** BEGIN GPL LICENSE BLOCK *****
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License
00009  * as published by the Free Software Foundation; either version 2
00010  * of the License, or (at your option) any later version. 
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with this program; if not, write to the Free Software Foundation,
00019  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
00020  *
00021  * The Original Code is Copyright (C) 2006 Blender Foundation
00022  * All rights reserved.
00023  *
00024  * The Original Code is: all of this file.
00025  *
00026  * Contributor(s): none yet.
00027  *
00028  * ***** END GPL LICENSE BLOCK *****
00029  */
00030 
00036 #include <errno.h>
00037 #include <string.h>
00038 
00039 #include "MEM_guardedalloc.h"
00040 
00041 
00042 #include "BLI_blenlib.h"
00043 #include "BLI_gsqueue.h"
00044 #include "BLI_threads.h"
00045 
00046 #include "PIL_time.h"
00047 
00048 /* for checking system threads - BLI_system_thread_count */
00049 #ifdef WIN32
00050 #include "windows.h"
00051 #include <sys/timeb.h>
00052 #elif defined(__APPLE__)
00053 #include <sys/types.h>
00054 #include <sys/sysctl.h>
00055 #else
00056 #include <unistd.h> 
00057 #include <sys/time.h>
00058 #endif
00059 
00060 #if defined(__APPLE__) && (PARALLEL == 1) && (__GNUC__ == 4) && (__GNUC_MINOR__ == 2)
00061 /* ************** libgomp (Apple gcc 4.2.1) TLS bug workaround *************** */
00062 extern pthread_key_t gomp_tls_key;
00063 static void *thread_tls_data;
00064 #endif
00065 
00066 /* ********** basic thread control API ************ 
00067 
00068 Many thread cases have an X amount of jobs, and only an Y amount of
00069 threads are useful (typically amount of cpus)
00070 
00071 This code can be used to start a maximum amount of 'thread slots', which
00072 then can be filled in a loop with an idle timer. 
00073 
00074 A sample loop can look like this (pseudo c);
00075 
00076         ListBase lb;
00077         int maxthreads= 2;
00078         int cont= 1;
00079 
00080         BLI_init_threads(&lb, do_something_func, maxthreads);
00081 
00082         while(cont) {
00083                 if(BLI_available_threads(&lb) && !(escape loop event)) {
00084                         // get new job (data pointer)
00085                         // tag job 'processed 
00086                         BLI_insert_thread(&lb, job);
00087                 }
00088                 else PIL_sleep_ms(50);
00089                 
00090                 // find if a job is ready, this the do_something_func() should write in job somewhere
00091                 cont= 0;
00092                 for(go over all jobs)
00093                         if(job is ready) {
00094                                 if(job was not removed) {
00095                                         BLI_remove_thread(&lb, job);
00096                                 }
00097                         }
00098                         else cont= 1;
00099                 }
00100                 // conditions to exit loop 
00101                 if(if escape loop event) {
00102                         if(BLI_available_threadslots(&lb)==maxthreads)
00103                                 break;
00104                 }
00105         }
00106 
00107         BLI_end_threads(&lb);
00108 
00109  ************************************************ */
00110 static pthread_mutex_t _malloc_lock = PTHREAD_MUTEX_INITIALIZER;
00111 static pthread_mutex_t _image_lock = PTHREAD_MUTEX_INITIALIZER;
00112 static pthread_mutex_t _preview_lock = PTHREAD_MUTEX_INITIALIZER;
00113 static pthread_mutex_t _viewer_lock = PTHREAD_MUTEX_INITIALIZER;
00114 static pthread_mutex_t _custom1_lock = PTHREAD_MUTEX_INITIALIZER;
00115 static pthread_mutex_t _rcache_lock = PTHREAD_MUTEX_INITIALIZER;
00116 static pthread_mutex_t _opengl_lock = PTHREAD_MUTEX_INITIALIZER;
00117 static pthread_t mainid;
00118 static int thread_levels= 0;    /* threads can be invoked inside threads */
00119 
00120 /* just a max for security reasons */
00121 #define RE_MAX_THREAD BLENDER_MAX_THREADS
00122 
00123 typedef struct ThreadSlot {
00124         struct ThreadSlot *next, *prev;
00125         void *(*do_thread)(void *);
00126         void *callerdata;
00127         pthread_t pthread;
00128         int avail;
00129 } ThreadSlot;
00130 
00131 static void BLI_lock_malloc_thread(void)
00132 {
00133         pthread_mutex_lock(&_malloc_lock);
00134 }
00135 
00136 static void BLI_unlock_malloc_thread(void)
00137 {
00138         pthread_mutex_unlock(&_malloc_lock);
00139 }
00140 
00141 void BLI_threadapi_init(void)
00142 {
00143         mainid = pthread_self();
00144 }
00145 
00146 /* tot = 0 only initializes malloc mutex in a safe way (see sequence.c)
00147    problem otherwise: scene render will kill of the mutex!
00148 */
00149 
00150 void BLI_init_threads(ListBase *threadbase, void *(*do_thread)(void *), int tot)
00151 {
00152         int a;
00153 
00154         if(threadbase != NULL && tot > 0) {
00155                 threadbase->first= threadbase->last= NULL;
00156         
00157                 if(tot>RE_MAX_THREAD) tot= RE_MAX_THREAD;
00158                 else if(tot<1) tot= 1;
00159         
00160                 for(a=0; a<tot; a++) {
00161                         ThreadSlot *tslot= MEM_callocN(sizeof(ThreadSlot), "threadslot");
00162                         BLI_addtail(threadbase, tslot);
00163                         tslot->do_thread= do_thread;
00164                         tslot->avail= 1;
00165                 }
00166         }
00167         
00168         if(thread_levels == 0) {
00169                 MEM_set_lock_callback(BLI_lock_malloc_thread, BLI_unlock_malloc_thread);
00170 
00171 #if defined(__APPLE__) && (PARALLEL == 1) && (__GNUC__ == 4) && (__GNUC_MINOR__ == 2)
00172                 /* workaround for Apple gcc 4.2.1 omp vs background thread bug,
00173                    we copy gomp thread local storage pointer to setting it again
00174                    inside the thread that we start */
00175                 thread_tls_data = pthread_getspecific(gomp_tls_key);
00176 #endif
00177         }
00178 
00179         thread_levels++;
00180 }
00181 
00182 /* amount of available threads */
00183 int BLI_available_threads(ListBase *threadbase)
00184 {
00185         ThreadSlot *tslot;
00186         int counter=0;
00187         
00188         for(tslot= threadbase->first; tslot; tslot= tslot->next) {
00189                 if(tslot->avail)
00190                         counter++;
00191         }
00192         return counter;
00193 }
00194 
00195 /* returns thread number, for sample patterns or threadsafe tables */
00196 int BLI_available_thread_index(ListBase *threadbase)
00197 {
00198         ThreadSlot *tslot;
00199         int counter=0;
00200         
00201         for(tslot= threadbase->first; tslot; tslot= tslot->next, counter++) {
00202                 if(tslot->avail)
00203                         return counter;
00204         }
00205         return 0;
00206 }
00207 
00208 static void *tslot_thread_start(void *tslot_p)
00209 {
00210         ThreadSlot *tslot= (ThreadSlot*)tslot_p;
00211 
00212 #if defined(__APPLE__) && (PARALLEL == 1) && (__GNUC__ == 4) && (__GNUC_MINOR__ == 2)
00213         /* workaround for Apple gcc 4.2.1 omp vs background thread bug,
00214            set gomp thread local storage pointer which was copied beforehand */
00215         pthread_setspecific (gomp_tls_key, thread_tls_data);
00216 #endif
00217 
00218         return tslot->do_thread(tslot->callerdata);
00219 }
00220 
00221 int BLI_thread_is_main(void) {
00222         return pthread_equal(pthread_self(), mainid);
00223 }
00224 
00225 void BLI_insert_thread(ListBase *threadbase, void *callerdata)
00226 {
00227         ThreadSlot *tslot;
00228         
00229         for(tslot= threadbase->first; tslot; tslot= tslot->next) {
00230                 if(tslot->avail) {
00231                         tslot->avail= 0;
00232                         tslot->callerdata= callerdata;
00233                         pthread_create(&tslot->pthread, NULL, tslot_thread_start, tslot);
00234                         return;
00235                 }
00236         }
00237         printf("ERROR: could not insert thread slot\n");
00238 }
00239 
00240 void BLI_remove_thread(ListBase *threadbase, void *callerdata)
00241 {
00242         ThreadSlot *tslot;
00243         
00244         for(tslot= threadbase->first; tslot; tslot= tslot->next) {
00245                 if(tslot->callerdata==callerdata) {
00246                         pthread_join(tslot->pthread, NULL);
00247                         tslot->callerdata= NULL;
00248                         tslot->avail= 1;
00249                 }
00250         }
00251 }
00252 
00253 void BLI_remove_thread_index(ListBase *threadbase, int index)
00254 {
00255         ThreadSlot *tslot;
00256         int counter=0;
00257         
00258         for(tslot = threadbase->first; tslot; tslot = tslot->next, counter++) {
00259                 if (counter == index && tslot->avail == 0) {
00260                         pthread_join(tslot->pthread, NULL);
00261                         tslot->callerdata = NULL;
00262                         tslot->avail = 1;
00263                         break;
00264                 }
00265         }
00266 }
00267 
00268 void BLI_remove_threads(ListBase *threadbase)
00269 {
00270         ThreadSlot *tslot;
00271         
00272         for(tslot = threadbase->first; tslot; tslot = tslot->next) {
00273                 if (tslot->avail == 0) {
00274                         pthread_join(tslot->pthread, NULL);
00275                         tslot->callerdata = NULL;
00276                         tslot->avail = 1;
00277                 }
00278         }
00279 }
00280 
00281 void BLI_end_threads(ListBase *threadbase)
00282 {
00283         ThreadSlot *tslot;
00284         
00285         /* only needed if there's actually some stuff to end
00286          * this way we don't end up decrementing thread_levels on an empty threadbase 
00287          * */
00288         if (threadbase && threadbase->first != NULL) {
00289                 for(tslot= threadbase->first; tslot; tslot= tslot->next) {
00290                         if(tslot->avail==0) {
00291                                 pthread_join(tslot->pthread, NULL);
00292                         }
00293                 }
00294                 BLI_freelistN(threadbase);
00295         }
00296 
00297         thread_levels--;
00298         if(thread_levels==0)
00299                 MEM_set_lock_callback(NULL, NULL);
00300 }
00301 
00302 /* System Information */
00303 
00304 /* how many threads are native on this system? */
00305 int BLI_system_thread_count( void )
00306 {
00307         int t;
00308 #ifdef WIN32
00309         SYSTEM_INFO info;
00310         GetSystemInfo(&info);
00311         t = (int) info.dwNumberOfProcessors;
00312 #else 
00313 #       ifdef __APPLE__
00314         int mib[2];
00315         size_t len;
00316         
00317         mib[0] = CTL_HW;
00318         mib[1] = HW_NCPU;
00319         len = sizeof(t);
00320         sysctl(mib, 2, &t, &len, NULL, 0);
00321 #       elif defined(__sgi)
00322         t = sysconf(_SC_NPROC_ONLN);
00323 #       else
00324         t = (int)sysconf(_SC_NPROCESSORS_ONLN);
00325 #       endif
00326 #endif
00327         
00328         if (t>RE_MAX_THREAD)
00329                 return RE_MAX_THREAD;
00330         if (t<1)
00331                 return 1;
00332         
00333         return t;
00334 }
00335 
00336 /* Global Mutex Locks */
00337 
00338 void BLI_lock_thread(int type)
00339 {
00340         if (type==LOCK_IMAGE)
00341                 pthread_mutex_lock(&_image_lock);
00342         else if (type==LOCK_PREVIEW)
00343                 pthread_mutex_lock(&_preview_lock);
00344         else if (type==LOCK_VIEWER)
00345                 pthread_mutex_lock(&_viewer_lock);
00346         else if (type==LOCK_CUSTOM1)
00347                 pthread_mutex_lock(&_custom1_lock);
00348         else if (type==LOCK_RCACHE)
00349                 pthread_mutex_lock(&_rcache_lock);
00350         else if (type==LOCK_OPENGL)
00351                 pthread_mutex_lock(&_opengl_lock);
00352 }
00353 
00354 void BLI_unlock_thread(int type)
00355 {
00356         if (type==LOCK_IMAGE)
00357                 pthread_mutex_unlock(&_image_lock);
00358         else if (type==LOCK_PREVIEW)
00359                 pthread_mutex_unlock(&_preview_lock);
00360         else if (type==LOCK_VIEWER)
00361                 pthread_mutex_unlock(&_viewer_lock);
00362         else if(type==LOCK_CUSTOM1)
00363                 pthread_mutex_unlock(&_custom1_lock);
00364         else if(type==LOCK_RCACHE)
00365                 pthread_mutex_unlock(&_rcache_lock);
00366         else if(type==LOCK_OPENGL)
00367                 pthread_mutex_unlock(&_opengl_lock);
00368 }
00369 
00370 /* Mutex Locks */
00371 
00372 void BLI_mutex_init(ThreadMutex *mutex)
00373 {
00374         pthread_mutex_init(mutex, NULL);
00375 }
00376 
00377 void BLI_mutex_lock(ThreadMutex *mutex)
00378 {
00379         pthread_mutex_lock(mutex);
00380 }
00381 
00382 void BLI_mutex_unlock(ThreadMutex *mutex)
00383 {
00384         pthread_mutex_unlock(mutex);
00385 }
00386 
00387 void BLI_mutex_end(ThreadMutex *mutex)
00388 {
00389         pthread_mutex_destroy(mutex);
00390 }
00391 
00392 /* Read/Write Mutex Lock */
00393 
00394 void BLI_rw_mutex_init(ThreadRWMutex *mutex)
00395 {
00396         pthread_rwlock_init(mutex, NULL);
00397 }
00398 
00399 void BLI_rw_mutex_lock(ThreadRWMutex *mutex, int mode)
00400 {
00401         if(mode == THREAD_LOCK_READ)
00402                 pthread_rwlock_rdlock(mutex);
00403         else
00404                 pthread_rwlock_wrlock(mutex);
00405 }
00406 
00407 void BLI_rw_mutex_unlock(ThreadRWMutex *mutex)
00408 {
00409         pthread_rwlock_unlock(mutex);
00410 }
00411 
00412 void BLI_rw_mutex_end(ThreadRWMutex *mutex)
00413 {
00414         pthread_rwlock_destroy(mutex);
00415 }
00416 
00417 /* ************************************************ */
00418 
00419 typedef struct ThreadedWorker {
00420         ListBase threadbase;
00421         void *(*work_fnct)(void *);
00422         char     busy[RE_MAX_THREAD];
00423         int              total;
00424         int              sleep_time;
00425 } ThreadedWorker;
00426 
00427 typedef struct WorkParam {
00428         ThreadedWorker *worker;
00429         void *param;
00430         int       index;
00431 } WorkParam;
00432 
00433 static void *exec_work_fnct(void *v_param)
00434 {
00435         WorkParam *p = (WorkParam*)v_param;
00436         void *value;
00437         
00438         value = p->worker->work_fnct(p->param);
00439         
00440         p->worker->busy[p->index] = 0;
00441         MEM_freeN(p);
00442         
00443         return value;
00444 }
00445 
00446 ThreadedWorker *BLI_create_worker(void *(*do_thread)(void *), int tot, int sleep_time)
00447 {
00448         ThreadedWorker *worker;
00449         
00450         (void)sleep_time; /* unused */
00451         
00452         worker = MEM_callocN(sizeof(ThreadedWorker), "threadedworker");
00453         
00454         if (tot > RE_MAX_THREAD)
00455         {
00456                 tot = RE_MAX_THREAD;
00457         }
00458         else if (tot < 1)
00459         {
00460                 tot= 1;
00461         }
00462         
00463         worker->total = tot;
00464         worker->work_fnct = do_thread;
00465         
00466         BLI_init_threads(&worker->threadbase, exec_work_fnct, tot);
00467         
00468         return worker;
00469 }
00470 
00471 void BLI_end_worker(ThreadedWorker *worker)
00472 {
00473         BLI_remove_threads(&worker->threadbase);
00474 }
00475 
00476 void BLI_destroy_worker(ThreadedWorker *worker)
00477 {
00478         BLI_end_worker(worker);
00479         BLI_freelistN(&worker->threadbase);
00480         MEM_freeN(worker);
00481 }
00482 
00483 void BLI_insert_work(ThreadedWorker *worker, void *param)
00484 {
00485         WorkParam *p = MEM_callocN(sizeof(WorkParam), "workparam");
00486         int index;
00487         
00488         if (BLI_available_threads(&worker->threadbase) == 0)
00489         {
00490                 index = worker->total;
00491                 while(index == worker->total)
00492                 {
00493                         PIL_sleep_ms(worker->sleep_time);
00494                         
00495                         for (index = 0; index < worker->total; index++)
00496                         {
00497                                 if (worker->busy[index] == 0)
00498                                 {
00499                                         BLI_remove_thread_index(&worker->threadbase, index);
00500                                         break;
00501                                 }
00502                         }
00503                 }
00504         }
00505         else
00506         {
00507                 index = BLI_available_thread_index(&worker->threadbase);
00508         }
00509         
00510         worker->busy[index] = 1;
00511         
00512         p->param = param;
00513         p->index = index;
00514         p->worker = worker;
00515         
00516         BLI_insert_thread(&worker->threadbase, p);
00517 }
00518 
00519 /* ************************************************ */
00520 
00521 struct ThreadQueue {
00522         GSQueue *queue;
00523         pthread_mutex_t mutex;
00524         pthread_cond_t cond;
00525         int nowait;
00526 };
00527 
00528 ThreadQueue *BLI_thread_queue_init(void)
00529 {
00530         ThreadQueue *queue;
00531 
00532         queue= MEM_callocN(sizeof(ThreadQueue), "ThreadQueue");
00533         queue->queue= BLI_gsqueue_new(sizeof(void*));
00534 
00535         pthread_mutex_init(&queue->mutex, NULL);
00536         pthread_cond_init(&queue->cond, NULL);
00537 
00538         return queue;
00539 }
00540 
00541 void BLI_thread_queue_free(ThreadQueue *queue)
00542 {
00543         pthread_cond_destroy(&queue->cond);
00544         pthread_mutex_destroy(&queue->mutex);
00545 
00546         BLI_gsqueue_free(queue->queue);
00547 
00548         MEM_freeN(queue);
00549 }
00550 
00551 void BLI_thread_queue_push(ThreadQueue *queue, void *work)
00552 {
00553         pthread_mutex_lock(&queue->mutex);
00554 
00555         BLI_gsqueue_push(queue->queue, &work);
00556 
00557         /* signal threads waiting to pop */
00558         pthread_cond_signal(&queue->cond);
00559         pthread_mutex_unlock(&queue->mutex);
00560 }
00561 
00562 void *BLI_thread_queue_pop(ThreadQueue *queue)
00563 {
00564         void *work= NULL;
00565 
00566         /* wait until there is work */
00567         pthread_mutex_lock(&queue->mutex);
00568         while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
00569                 pthread_cond_wait(&queue->cond, &queue->mutex);
00570 
00571         /* if we have something, pop it */
00572         if(!BLI_gsqueue_is_empty(queue->queue))
00573                 BLI_gsqueue_pop(queue->queue, &work);
00574 
00575         pthread_mutex_unlock(&queue->mutex);
00576 
00577         return work;
00578 }
00579 
00580 static void wait_timeout(struct timespec *timeout, int ms)
00581 {
00582         ldiv_t div_result;
00583         long sec, usec, x;
00584 
00585 #ifdef WIN32
00586         {
00587                 struct _timeb now;
00588                 _ftime(&now);
00589                 sec = now.time;
00590                 usec = now.millitm*1000; /* microsecond precision would be better */
00591         }
00592 #else
00593         {
00594                 struct timeval now;
00595                 gettimeofday(&now, NULL);
00596                 sec = now.tv_sec;
00597                 usec = now.tv_usec;
00598         }
00599 #endif
00600 
00601         /* add current time + millisecond offset */
00602         div_result = ldiv(ms, 1000);
00603         timeout->tv_sec = sec + div_result.quot;
00604 
00605         x = usec + (div_result.rem*1000);
00606 
00607         if (x >= 1000000) {
00608                 timeout->tv_sec++;
00609                 x -= 1000000;
00610         }
00611 
00612         timeout->tv_nsec = x*1000;
00613 }
00614 
00615 void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
00616 {
00617         double t;
00618         void *work= NULL;
00619         struct timespec timeout;
00620 
00621         t= PIL_check_seconds_timer();
00622         wait_timeout(&timeout, ms);
00623 
00624         /* wait until there is work */
00625         pthread_mutex_lock(&queue->mutex);
00626         while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
00627                 if(pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
00628                         break;
00629                 else if(PIL_check_seconds_timer() - t >= ms*0.001)
00630                         break;
00631         }
00632 
00633         /* if we have something, pop it */
00634         if(!BLI_gsqueue_is_empty(queue->queue))
00635                 BLI_gsqueue_pop(queue->queue, &work);
00636 
00637         pthread_mutex_unlock(&queue->mutex);
00638 
00639         return work;
00640 }
00641 
00642 int BLI_thread_queue_size(ThreadQueue *queue)
00643 {
00644         int size;
00645 
00646         pthread_mutex_lock(&queue->mutex);
00647         size= BLI_gsqueue_size(queue->queue);
00648         pthread_mutex_unlock(&queue->mutex);
00649 
00650         return size;
00651 }
00652 
00653 void BLI_thread_queue_nowait(ThreadQueue *queue)
00654 {
00655         pthread_mutex_lock(&queue->mutex);
00656 
00657         queue->nowait= 1;
00658 
00659         /* signal threads waiting to pop */
00660         pthread_cond_signal(&queue->cond);
00661         pthread_mutex_unlock(&queue->mutex);
00662 }
00663