00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void *_thread(void *data);
00028 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00029 static void _run(gearman_server_thread_st *thread, void *fn_arg);
00030
00031 static gearman_return_t _wakeup_init(gearmand_thread_st *thread);
00032 static void _wakeup_close(gearmand_thread_st *thread);
00033 static void _wakeup_clear(gearmand_thread_st *thread);
00034 static void _wakeup_event(int fd, short events, void *arg);
00035 static void _clear_events(gearmand_thread_st *thread);
00036
00039
00040
00041
00042
00043 gearman_return_t gearmand_thread_create(gearmand_st *gearmand)
00044 {
00045 gearmand_thread_st *thread;
00046 gearman_return_t ret;
00047 int pthread_ret;
00048
00049 thread= malloc(sizeof(gearmand_thread_st));
00050 if (thread == NULL)
00051 {
00052 gearmand_log_fatal(gearmand, "gearmand_thread_create:malloc");
00053 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00054 }
00055
00056 if (gearman_server_thread_create(&(gearmand->server),
00057 &(thread->server_thread)) == NULL)
00058 {
00059 free(thread);
00060 gearmand_log_fatal(gearmand, "gearmand_thread_create:gearman_server_thread_create:NULL");
00061 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00062 }
00063
00064 gearman_server_thread_set_log_fn(&(thread->server_thread), _log, thread,
00065 gearmand->verbose);
00066 gearman_server_thread_set_event_watch(&(thread->server_thread),
00067 gearmand_connection_watch, NULL);
00068
00069 thread->options= 0;
00070 thread->count= 0;
00071 thread->dcon_count= 0;
00072 thread->dcon_add_count= 0;
00073 thread->free_dcon_count= 0;
00074 thread->wakeup_fd[0]= -1;
00075 thread->wakeup_fd[1]= -1;
00076 GEARMAN_LIST_ADD(gearmand->thread, thread,)
00077 thread->gearmand= gearmand;
00078 thread->dcon_list= NULL;
00079 thread->dcon_add_list= NULL;
00080 thread->free_dcon_list= NULL;
00081
00082
00083
00084 if (gearmand->threads == 0)
00085 thread->base= gearmand->base;
00086 else
00087 {
00088 gearmand_log_info(gearmand, "Initializing libevent for IO thread");
00089
00090 thread->base= event_base_new();
00091 if (thread->base == NULL)
00092 {
00093 gearmand_thread_free(thread);
00094 gearmand_log_fatal(gearmand, "gearmand_thread_create:event_base_new:NULL");
00095 return GEARMAN_EVENT;
00096 }
00097 }
00098
00099 ret= _wakeup_init(thread);
00100 if (ret != GEARMAN_SUCCESS)
00101 {
00102 gearmand_thread_free(thread);
00103 return ret;
00104 }
00105
00106
00107 if (gearmand->threads == 0)
00108 return GEARMAN_SUCCESS;
00109
00110 thread->count= gearmand->thread_count;
00111
00112 pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
00113 if (pthread_ret != 0)
00114 {
00115 thread->count= 0;
00116 gearmand_thread_free(thread);
00117 gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_mutex_init:%d", pthread_ret);
00118 return GEARMAN_PTHREAD;
00119 }
00120
00121 thread->options|= GEARMAND_THREAD_LOCK;
00122
00123 gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
00124
00125 pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
00126 if (pthread_ret != 0)
00127 {
00128 thread->count= 0;
00129 gearmand_thread_free(thread);
00130 gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_create:%d", pthread_ret);
00131
00132 return GEARMAN_PTHREAD;
00133 }
00134
00135 gearmand_log_info(gearmand, "Thread %u created", thread->count);
00136
00137 return GEARMAN_SUCCESS;
00138 }
00139
00140 void gearmand_thread_free(gearmand_thread_st *thread)
00141 {
00142 gearmand_con_st *dcon;
00143
00144 if (thread->gearmand->threads && thread->count > 0)
00145 {
00146 gearmand_log_info(thread->gearmand, "Shutting down thread %u", thread->count);
00147
00148 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
00149 (void) pthread_join(thread->id, NULL);
00150 }
00151
00152 if (thread->options & GEARMAND_THREAD_LOCK)
00153 (void) pthread_mutex_destroy(&(thread->lock));
00154
00155 _wakeup_close(thread);
00156
00157 while (thread->dcon_list != NULL)
00158 gearmand_con_free(thread->dcon_list);
00159
00160 while (thread->dcon_add_list != NULL)
00161 {
00162 dcon= thread->dcon_add_list;
00163 thread->dcon_add_list= dcon->next;
00164 close(dcon->fd);
00165 free(dcon);
00166 }
00167
00168 while (thread->free_dcon_list != NULL)
00169 {
00170 dcon= thread->free_dcon_list;
00171 thread->free_dcon_list= dcon->next;
00172 free(dcon);
00173 }
00174
00175 gearman_server_thread_free(&(thread->server_thread));
00176
00177 GEARMAN_LIST_DEL(thread->gearmand->thread, thread,)
00178
00179 if (thread->gearmand->threads > 0)
00180 {
00181 if (thread->base != NULL)
00182 event_base_free(thread->base);
00183
00184 gearmand_log_info(thread->gearmand, "Thread %u shutdown complete", thread->count);
00185 }
00186
00187 free(thread);
00188 }
00189
00190 void gearmand_thread_wakeup(gearmand_thread_st *thread,
00191 gearmand_wakeup_t wakeup)
00192 {
00193 uint8_t buffer= wakeup;
00194
00195
00196
00197 if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
00198 gearmand_log_error(thread->gearmand, "gearmand_thread_wakeup:write:%d", errno);
00199 }
00200
00201 void gearmand_thread_run(gearmand_thread_st *thread)
00202 {
00203 gearman_server_con_st *server_con;
00204 gearman_return_t ret;
00205 gearmand_con_st *dcon;
00206
00207 while (1)
00208 {
00209 server_con= gearman_server_thread_run(&(thread->server_thread), &ret);
00210 if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
00211 ret == GEARMAN_SHUTDOWN_GRACEFUL)
00212 {
00213 return;
00214 }
00215
00216 if (server_con == NULL)
00217 {
00218
00219
00220 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00221 return;
00222 }
00223
00224 dcon= (gearmand_con_st *)gearman_server_con_data(server_con);
00225
00226 gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Disconnected", thread->count, dcon->host, dcon->port);
00227
00228 gearmand_con_free(dcon);
00229 }
00230 }
00231
00232
00233
00234
00235
00236 static void *_thread(void *data)
00237 {
00238 gearmand_thread_st *thread= (gearmand_thread_st *)data;
00239
00240 gearmand_log_info(thread->gearmand, "[%4u] Entering thread event loop", thread->count);
00241
00242 if (event_base_loop(thread->base, 0) == -1)
00243 {
00244 gearmand_log_fatal(thread->gearmand, "_io_thread:event_base_loop:-1");
00245 thread->gearmand->ret= GEARMAN_EVENT;
00246 }
00247
00248 gearmand_log_info(thread->gearmand, "[%4u] Exiting thread event loop", thread->count);
00249
00250 return NULL;
00251 }
00252
00253 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00254 {
00255 gearmand_thread_st *dthread= (gearmand_thread_st *)context;
00256 char buffer[GEARMAN_MAX_ERROR_SIZE];
00257
00258 snprintf(buffer, GEARMAN_MAX_ERROR_SIZE, "[%4u] %s", dthread->count, line);
00259 (*dthread->gearmand->log_fn)(buffer, verbose,
00260 (void *)dthread->gearmand->log_context);
00261 }
00262
00263 static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
00264 void *fn_arg)
00265 {
00266 gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
00267 gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
00268 }
00269
00270 static gearman_return_t _wakeup_init(gearmand_thread_st *thread)
00271 {
00272 int ret;
00273
00274 gearmand_log_info(thread->gearmand, "Creating IO thread wakeup pipe");
00275
00276 ret= pipe(thread->wakeup_fd);
00277 if (ret == -1)
00278 {
00279 gearmand_log_fatal(thread->gearmand, "_wakeup_init:pipe:%d", errno);
00280 return GEARMAN_ERRNO;
00281 }
00282
00283 ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
00284 if (ret == -1)
00285 {
00286 gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
00287 return GEARMAN_ERRNO;
00288 }
00289
00290 ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00291 if (ret == -1)
00292 {
00293 gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
00294 return GEARMAN_ERRNO;
00295 }
00296
00297 event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
00298 _wakeup_event, thread);
00299 event_base_set(thread->base, &(thread->wakeup_event));
00300
00301 if (event_add(&(thread->wakeup_event), NULL) == -1)
00302 {
00303 gearmand_log_fatal(thread->gearmand, "_wakeup_init:event_add:-1");
00304 return GEARMAN_EVENT;
00305 }
00306
00307 thread->options|= GEARMAND_THREAD_WAKEUP_EVENT;
00308
00309 return GEARMAN_SUCCESS;
00310 }
00311
00312 static void _wakeup_close(gearmand_thread_st *thread)
00313 {
00314 _wakeup_clear(thread);
00315
00316 if (thread->wakeup_fd[0] >= 0)
00317 {
00318 gearmand_log_info(thread->gearmand, "Closing IO thread wakeup pipe");
00319 close(thread->wakeup_fd[0]);
00320 thread->wakeup_fd[0]= -1;
00321 close(thread->wakeup_fd[1]);
00322 thread->wakeup_fd[1]= -1;
00323 }
00324 }
00325
00326 static void _wakeup_clear(gearmand_thread_st *thread)
00327 {
00328 if (thread->options & GEARMAND_THREAD_WAKEUP_EVENT)
00329 {
00330 gearmand_log_info(thread->gearmand, "[%4u] Clearing event for IO thread wakeup pipe", thread->count);
00331 int del_ret= event_del(&(thread->wakeup_event));
00332 assert(del_ret == 0);
00333 thread->options&= (gearmand_thread_options_t)~GEARMAND_THREAD_WAKEUP_EVENT;
00334 }
00335 }
00336
00337 static void _wakeup_event(int fd, short events __attribute__ ((unused)),
00338 void *arg)
00339 {
00340 gearmand_thread_st *thread= (gearmand_thread_st *)arg;
00341 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00342 ssize_t ret;
00343 ssize_t x;
00344
00345 while (1)
00346 {
00347 ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00348 if (ret == 0)
00349 {
00350 _clear_events(thread);
00351 gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:EOF");
00352 thread->gearmand->ret= GEARMAN_PIPE_EOF;
00353 return;
00354 }
00355 else if (ret == -1)
00356 {
00357 if (errno == EINTR)
00358 continue;
00359
00360 if (errno == EAGAIN)
00361 break;
00362
00363 _clear_events(thread);
00364 gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:%d", errno);
00365 thread->gearmand->ret= GEARMAN_ERRNO;
00366 return;
00367 }
00368
00369 for (x= 0; x < ret; x++)
00370 {
00371 switch ((gearmand_wakeup_t)buffer[x])
00372 {
00373 case GEARMAND_WAKEUP_PAUSE:
00374 gearmand_log_info(thread->gearmand, "[%4u] Received PAUSE wakeup event", thread->count);
00375 break;
00376
00377 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00378 gearmand_log_info(thread->gearmand,
00379 "[%4u] Received SHUTDOWN_GRACEFUL wakeup event",
00380 thread->count);
00381 if (gearman_server_shutdown_graceful(&(thread->gearmand->server)) == GEARMAN_SHUTDOWN)
00382 {
00383 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00384 }
00385 break;
00386
00387 case GEARMAND_WAKEUP_SHUTDOWN:
00388 gearmand_log_info(thread->gearmand, "[%4u] Received SHUTDOWN wakeup event", thread->count);
00389 _clear_events(thread);
00390 break;
00391
00392 case GEARMAND_WAKEUP_CON:
00393 gearmand_log_info(thread->gearmand, "[%4u] Received CON wakeup event", thread->count);
00394 gearmand_con_check_queue(thread);
00395 break;
00396
00397 case GEARMAND_WAKEUP_RUN:
00398 gearmand_log_debug(thread->gearmand, "[%4u] Received RUN wakeup event", thread->count);
00399 gearmand_thread_run(thread);
00400 break;
00401
00402 default:
00403 gearmand_log_fatal(thread->gearmand, "[%4u] Received unknown wakeup event (%u)", thread->count, buffer[x]);
00404 _clear_events(thread);
00405 thread->gearmand->ret= GEARMAN_UNKNOWN_STATE;
00406 break;
00407 }
00408 }
00409 }
00410 }
00411
00412 static void _clear_events(gearmand_thread_st *thread)
00413 {
00414 _wakeup_clear(thread);
00415
00416 while (thread->dcon_list != NULL)
00417 gearmand_con_free(thread->dcon_list);
00418 }