00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <pthread.h>
00025
00026 #include "avcodec.h"
00027
00028 typedef int (action_t)(AVCodecContext *c, void *arg);
00029
00030 typedef struct ThreadContext {
00031 pthread_t *workers;
00032 action_t *func;
00033 void **args;
00034 int *rets;
00035 int rets_count;
00036 int job_count;
00037
00038 pthread_cond_t last_job_cond;
00039 pthread_cond_t current_job_cond;
00040 pthread_mutex_t current_job_lock;
00041 int current_job;
00042 int done;
00043 } ThreadContext;
00044
00045 static void* attribute_align_arg worker(void *v)
00046 {
00047 AVCodecContext *avctx = v;
00048 ThreadContext *c = avctx->thread_opaque;
00049 int our_job = c->job_count;
00050 int thread_count = avctx->thread_count;
00051 int self_id;
00052
00053 pthread_mutex_lock(&c->current_job_lock);
00054 self_id = c->current_job++;
00055 for (;;){
00056 while (our_job >= c->job_count) {
00057 if (c->current_job == thread_count + c->job_count)
00058 pthread_cond_signal(&c->last_job_cond);
00059
00060 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
00061 our_job = self_id;
00062
00063 if (c->done) {
00064 pthread_mutex_unlock(&c->current_job_lock);
00065 return NULL;
00066 }
00067 }
00068 pthread_mutex_unlock(&c->current_job_lock);
00069
00070 c->rets[our_job%c->rets_count] = c->func(avctx, c->args[our_job]);
00071
00072 pthread_mutex_lock(&c->current_job_lock);
00073 our_job = c->current_job++;
00074 }
00075 }
00076
00077 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
00078 {
00079 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
00080 pthread_mutex_unlock(&c->current_job_lock);
00081 }
00082
00083 void avcodec_thread_free(AVCodecContext *avctx)
00084 {
00085 ThreadContext *c = avctx->thread_opaque;
00086 int i;
00087
00088 pthread_mutex_lock(&c->current_job_lock);
00089 c->done = 1;
00090 pthread_cond_broadcast(&c->current_job_cond);
00091 pthread_mutex_unlock(&c->current_job_lock);
00092
00093 for (i=0; i<avctx->thread_count; i++)
00094 pthread_join(c->workers[i], NULL);
00095
00096 pthread_mutex_destroy(&c->current_job_lock);
00097 pthread_cond_destroy(&c->current_job_cond);
00098 pthread_cond_destroy(&c->last_job_cond);
00099 av_free(c->workers);
00100 av_freep(&avctx->thread_opaque);
00101 }
00102
00103 int avcodec_thread_execute(AVCodecContext *avctx, action_t* func, void **arg, int *ret, int job_count)
00104 {
00105 ThreadContext *c= avctx->thread_opaque;
00106 int dummy_ret;
00107
00108 if (job_count <= 0)
00109 return 0;
00110
00111 pthread_mutex_lock(&c->current_job_lock);
00112
00113 c->current_job = avctx->thread_count;
00114 c->job_count = job_count;
00115 c->args = arg;
00116 c->func = func;
00117 if (ret) {
00118 c->rets = ret;
00119 c->rets_count = job_count;
00120 } else {
00121 c->rets = &dummy_ret;
00122 c->rets_count = 1;
00123 }
00124 pthread_cond_broadcast(&c->current_job_cond);
00125
00126 avcodec_thread_park_workers(c, avctx->thread_count);
00127
00128 return 0;
00129 }
00130
00131 int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
00132 {
00133 int i;
00134 ThreadContext *c;
00135
00136 c = av_mallocz(sizeof(ThreadContext));
00137 if (!c)
00138 return -1;
00139
00140 c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
00141 if (!c->workers) {
00142 av_free(c);
00143 return -1;
00144 }
00145
00146 avctx->thread_opaque = c;
00147 avctx->thread_count = thread_count;
00148 c->current_job = 0;
00149 c->job_count = 0;
00150 c->done = 0;
00151 pthread_cond_init(&c->current_job_cond, NULL);
00152 pthread_cond_init(&c->last_job_cond, NULL);
00153 pthread_mutex_init(&c->current_job_lock, NULL);
00154 pthread_mutex_lock(&c->current_job_lock);
00155 for (i=0; i<thread_count; i++) {
00156 if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
00157 avctx->thread_count = i;
00158 pthread_mutex_unlock(&c->current_job_lock);
00159 avcodec_thread_free(avctx);
00160 return -1;
00161 }
00162 }
00163
00164 avcodec_thread_park_workers(c, thread_count);
00165
00166 avctx->execute = avcodec_thread_execute;
00167 return 0;
00168 }