Libav
|
00001 /* 00002 * Copyright (c) 2004 Roman Shaposhnik 00003 * 00004 * Many thanks to Steven M. Schultz for providing clever ideas and 00005 * to Michael Niedermayer <michaelni@gmx.at> for writing initial 00006 * implementation. 00007 * 00008 * This file is part of FFmpeg. 00009 * 00010 * FFmpeg is free software; you can redistribute it and/or 00011 * modify it under the terms of the GNU Lesser General Public 00012 * License as published by the Free Software Foundation; either 00013 * version 2.1 of the License, or (at your option) any later version. 00014 * 00015 * FFmpeg is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 * Lesser General Public License for more details. 00019 * 00020 * You should have received a copy of the GNU Lesser General Public 00021 * License along with FFmpeg; if not, write to the Free Software 00022 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 00023 */ 00024 #include <pthread.h> 00025 00026 #include "avcodec.h" 00027 00028 typedef int (action_func)(AVCodecContext *c, void *arg); 00029 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr); 00030 00031 typedef struct ThreadContext { 00032 pthread_t *workers; 00033 action_func *func; 00034 action_func2 *func2; 00035 void *args; 00036 int *rets; 00037 int rets_count; 00038 int job_count; 00039 int job_size; 00040 00041 pthread_cond_t last_job_cond; 00042 pthread_cond_t current_job_cond; 00043 pthread_mutex_t current_job_lock; 00044 int current_job; 00045 int done; 00046 } ThreadContext; 00047 00048 static void* attribute_align_arg worker(void *v) 00049 { 00050 AVCodecContext *avctx = v; 00051 ThreadContext *c = avctx->thread_opaque; 00052 int our_job = c->job_count; 00053 int thread_count = avctx->thread_count; 00054 int self_id; 00055 00056 pthread_mutex_lock(&c->current_job_lock); 00057 self_id = c->current_job++; 00058 for (;;){ 00059 while (our_job >= c->job_count) { 00060 if (c->current_job == thread_count + c->job_count) 00061 pthread_cond_signal(&c->last_job_cond); 00062 00063 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); 00064 our_job = self_id; 00065 00066 if (c->done) { 00067 pthread_mutex_unlock(&c->current_job_lock); 00068 return NULL; 00069 } 00070 } 00071 pthread_mutex_unlock(&c->current_job_lock); 00072 00073 c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size): 00074 c->func2(avctx, c->args, our_job, self_id); 00075 00076 pthread_mutex_lock(&c->current_job_lock); 00077 our_job = c->current_job++; 00078 } 00079 } 00080 00081 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count) 00082 { 00083 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); 00084 pthread_mutex_unlock(&c->current_job_lock); 00085 } 00086 00087 void avcodec_thread_free(AVCodecContext *avctx) 00088 { 00089 ThreadContext *c = avctx->thread_opaque; 00090 int i; 00091 00092 pthread_mutex_lock(&c->current_job_lock); 00093 c->done = 1; 00094 pthread_cond_broadcast(&c->current_job_cond); 00095 pthread_mutex_unlock(&c->current_job_lock); 00096 00097 for (i=0; i<avctx->thread_count; i++) 00098 pthread_join(c->workers[i], NULL); 00099 00100 pthread_mutex_destroy(&c->current_job_lock); 00101 pthread_cond_destroy(&c->current_job_cond); 00102 pthread_cond_destroy(&c->last_job_cond); 00103 av_free(c->workers); 00104 av_freep(&avctx->thread_opaque); 00105 } 00106 00107 static int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size) 00108 { 00109 ThreadContext *c= avctx->thread_opaque; 00110 int dummy_ret; 00111 00112 if (job_count <= 0) 00113 return 0; 00114 00115 pthread_mutex_lock(&c->current_job_lock); 00116 00117 c->current_job = avctx->thread_count; 00118 c->job_count = job_count; 00119 c->job_size = job_size; 00120 c->args = arg; 00121 c->func = func; 00122 if (ret) { 00123 c->rets = ret; 00124 c->rets_count = job_count; 00125 } else { 00126 c->rets = &dummy_ret; 00127 c->rets_count = 1; 00128 } 00129 pthread_cond_broadcast(&c->current_job_cond); 00130 00131 avcodec_thread_park_workers(c, avctx->thread_count); 00132 00133 return 0; 00134 } 00135 00136 static int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count) 00137 { 00138 ThreadContext *c= avctx->thread_opaque; 00139 c->func2 = func2; 00140 return avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0); 00141 } 00142 00143 int avcodec_thread_init(AVCodecContext *avctx, int thread_count) 00144 { 00145 int i; 00146 ThreadContext *c; 00147 00148 avctx->thread_count = thread_count; 00149 00150 if (thread_count <= 1) 00151 return 0; 00152 00153 c = av_mallocz(sizeof(ThreadContext)); 00154 if (!c) 00155 return -1; 00156 00157 c->workers = av_mallocz(sizeof(pthread_t)*thread_count); 00158 if (!c->workers) { 00159 av_free(c); 00160 return -1; 00161 } 00162 00163 avctx->thread_opaque = c; 00164 c->current_job = 0; 00165 c->job_count = 0; 00166 c->job_size = 0; 00167 c->done = 0; 00168 pthread_cond_init(&c->current_job_cond, NULL); 00169 pthread_cond_init(&c->last_job_cond, NULL); 00170 pthread_mutex_init(&c->current_job_lock, NULL); 00171 pthread_mutex_lock(&c->current_job_lock); 00172 for (i=0; i<thread_count; i++) { 00173 if(pthread_create(&c->workers[i], NULL, worker, avctx)) { 00174 avctx->thread_count = i; 00175 pthread_mutex_unlock(&c->current_job_lock); 00176 avcodec_thread_free(avctx); 00177 return -1; 00178 } 00179 } 00180 00181 avcodec_thread_park_workers(c, thread_count); 00182 00183 avctx->execute = avcodec_thread_execute; 00184 avctx->execute2 = avcodec_thread_execute2; 00185 return 0; 00186 }