00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 gearman_return_t _queue_replay_add(gearman_server_st *server, void *context,
00030 const void *unique, size_t unique_size,
00031 const void *function_name,
00032 size_t function_name_size, const void *data,
00033 size_t data_size,
00034 gearman_job_priority_t priority);
00035
00039 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00040 const char *error_code,
00041 const char *error_string);
00042
00046 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00047 gearman_packet_st *packet);
00048
00052 static gearman_return_t
00053 _server_queue_work_data(gearman_server_job_st *server_job,
00054 gearman_packet_st *packet, gearman_command_t command);
00055
00059 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00060
00063
00064
00065
00066
00067 gearman_server_st *gearman_server_create(gearman_server_st *server)
00068 {
00069 struct utsname un;
00070
00071 if (server == NULL)
00072 {
00073 server= malloc(sizeof(gearman_server_st));
00074 if (server == NULL)
00075 return NULL;
00076
00077 server->options.allocated= true;
00078 }
00079 else
00080 server->options.allocated= false;
00081
00082 server->state.queue_startup= false;
00083 server->flags.round_robin= false;
00084 server->flags.threaded= false;
00085 server->shutdown= false;
00086 server->shutdown_graceful= false;
00087 server->proc_wakeup= false;
00088 server->proc_shutdown= false;
00089 server->job_retries= 0;
00090 server->worker_wakeup= 0;
00091 server->thread_count= 0;
00092 server->free_packet_count= 0;
00093 server->function_count= 0;
00094 server->job_count= 0;
00095 server->unique_count= 0;
00096 server->free_job_count= 0;
00097 server->free_client_count= 0;
00098 server->free_worker_count= 0;
00099 server->thread_list= NULL;
00100 server->free_packet_list= NULL;
00101 server->function_list= NULL;
00102 server->free_job_list= NULL;
00103 server->free_client_list= NULL;
00104 server->free_worker_list= NULL;
00105 server->log_fn= NULL;
00106 server->log_context= NULL;
00107 server->queue_context= NULL;
00108 server->queue_add_fn= NULL;
00109 server->queue_flush_fn= NULL;
00110 server->queue_done_fn= NULL;
00111 server->queue_replay_fn= NULL;
00112 memset(server->job_hash, 0,
00113 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00114 memset(server->unique_hash, 0,
00115 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00116
00117 server->gearman= gearman_universal_create(&(server->gearman_universal_static), NULL);
00118 if (server->gearman == NULL)
00119 {
00120 gearman_server_free(server);
00121 return NULL;
00122 }
00123
00124 if (uname(&un) == -1)
00125 {
00126 gearman_server_free(server);
00127 return NULL;
00128 }
00129
00130 snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
00131 un.nodename);
00132 server->job_handle_count= 1;
00133
00134 return server;
00135 }
00136
00137 void gearman_server_free(gearman_server_st *server)
00138 {
00139 uint32_t key;
00140 gearman_server_packet_st *packet;
00141 gearman_server_job_st *job;
00142 gearman_server_client_st *client;
00143 gearman_server_worker_st *worker;
00144
00145
00146 assert(server->thread_list == NULL);
00147
00148 for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
00149 {
00150 while (server->job_hash[key] != NULL)
00151 gearman_server_job_free(server->job_hash[key]);
00152 }
00153
00154 while (server->function_list != NULL)
00155 gearman_server_function_free(server->function_list);
00156
00157 while (server->free_packet_list != NULL)
00158 {
00159 packet= server->free_packet_list;
00160 server->free_packet_list= packet->next;
00161 free(packet);
00162 }
00163
00164 while (server->free_job_list != NULL)
00165 {
00166 job= server->free_job_list;
00167 server->free_job_list= job->next;
00168 free(job);
00169 }
00170
00171 while (server->free_client_list != NULL)
00172 {
00173 client= server->free_client_list;
00174 server->free_client_list= client->con_next;
00175 free(client);
00176 }
00177
00178 while (server->free_worker_list != NULL)
00179 {
00180 worker= server->free_worker_list;
00181 server->free_worker_list= worker->con_next;
00182 free(worker);
00183 }
00184
00185 if (server->gearman != NULL)
00186 gearman_universal_free(server->gearman);
00187
00188 if (server->options.allocated)
00189 free(server);
00190 }
00191
00192 void gearman_server_set_job_retries(gearman_server_st *server,
00193 uint8_t job_retries)
00194 {
00195 server->job_retries= job_retries;
00196 }
00197
00198 void gearman_server_set_worker_wakeup(gearman_server_st *server,
00199 uint8_t worker_wakeup)
00200 {
00201 server->worker_wakeup= worker_wakeup;
00202 }
00203
00204 void gearman_server_set_log_fn(gearman_server_st *server,
00205 gearman_log_fn *function,
00206 void *context, gearman_verbose_t verbose)
00207 {
00208 server->log_fn= function;
00209 server->log_context= context;
00210 gearman_set_log_fn(server->gearman, _log, server, verbose);
00211 }
00212
00213 gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
00214 gearman_packet_st *packet)
00215 {
00216 gearman_return_t ret;
00217 gearman_server_job_st *server_job;
00218 char job_handle[GEARMAN_JOB_HANDLE_SIZE];
00219 char option[GEARMAN_OPTION_SIZE];
00220 gearman_server_client_st *server_client;
00221 char numerator_buffer[11];
00222 char denominator_buffer[11];
00223 gearman_job_priority_t priority;
00224 gearman_server_st *server= server_con->thread->server;
00225
00226 if (packet->magic == GEARMAN_MAGIC_RESPONSE)
00227 {
00228 return _server_error_packet(server_con, "bad_magic",
00229 "Request magic expected");
00230 }
00231
00232 switch (packet->command)
00233 {
00234
00235 case GEARMAN_COMMAND_ECHO_REQ:
00236
00237 ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
00238 GEARMAN_COMMAND_ECHO_RES, packet->data,
00239 packet->data_size, NULL);
00240 if (ret != GEARMAN_SUCCESS)
00241 return ret;
00242
00243 packet->options.free_data= false;
00244
00245 break;
00246
00247
00248 case GEARMAN_COMMAND_SUBMIT_JOB:
00249 case GEARMAN_COMMAND_SUBMIT_JOB_BG:
00250 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
00251 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
00252 case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
00253 case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
00254
00255 if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
00256 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG)
00257 {
00258 priority= GEARMAN_JOB_PRIORITY_NORMAL;
00259 }
00260 else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
00261 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
00262 {
00263 priority= GEARMAN_JOB_PRIORITY_HIGH;
00264 }
00265 else
00266 priority= GEARMAN_JOB_PRIORITY_LOW;
00267
00268 if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
00269 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
00270 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
00271 {
00272 server_client= NULL;
00273 }
00274 else
00275 {
00276 server_client= gearman_server_client_add(server_con);
00277 if (server_client == NULL)
00278 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00279 }
00280
00281
00282 server_job= gearman_server_job_add(server_con->thread->server,
00283 (char *)(packet->arg[0]),
00284 packet->arg_size[0] - 1,
00285 (char *)(packet->arg[1]),
00286 packet->arg_size[1] - 1, packet->data,
00287 packet->data_size, priority,
00288 server_client, &ret);
00289 if (ret == GEARMAN_SUCCESS)
00290 {
00291 packet->options.free_data= false;
00292 }
00293 else if (ret == GEARMAN_JOB_QUEUE_FULL)
00294 {
00295 return _server_error_packet(server_con, "queue_full",
00296 "Job queue is full");
00297 }
00298 else if (ret != GEARMAN_JOB_EXISTS)
00299 return ret;
00300
00301
00302 ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00303 GEARMAN_COMMAND_JOB_CREATED,
00304 server_job->job_handle,
00305 (size_t)strlen(server_job->job_handle),
00306 NULL);
00307 if (ret != GEARMAN_SUCCESS)
00308 return ret;
00309
00310 break;
00311
00312 case GEARMAN_COMMAND_GET_STATUS:
00313
00314 snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00315 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00316
00317 server_job= gearman_server_job_get(server_con->thread->server, job_handle, NULL);
00318
00319
00320 if (server_job == NULL)
00321 {
00322 ret= gearman_server_io_packet_add(server_con, false,
00323 GEARMAN_MAGIC_RESPONSE,
00324 GEARMAN_COMMAND_STATUS_RES, job_handle,
00325 (size_t)(strlen(job_handle) + 1),
00326 "0", (size_t)2, "0", (size_t)2, "0",
00327 (size_t)2, "0", (size_t)1, NULL);
00328 }
00329 else
00330 {
00331 snprintf(numerator_buffer, 11, "%u", server_job->numerator);
00332 snprintf(denominator_buffer, 11, "%u", server_job->denominator);
00333
00334 ret= gearman_server_io_packet_add(server_con, false,
00335 GEARMAN_MAGIC_RESPONSE,
00336 GEARMAN_COMMAND_STATUS_RES, job_handle,
00337 (size_t)(strlen(job_handle) + 1),
00338 "1", (size_t)2,
00339 server_job->worker == NULL ? "0" : "1",
00340 (size_t)2, numerator_buffer,
00341 (size_t)(strlen(numerator_buffer) + 1),
00342 denominator_buffer,
00343 (size_t)strlen(denominator_buffer),
00344 NULL);
00345 }
00346
00347 if (ret != GEARMAN_SUCCESS)
00348 return ret;
00349
00350 break;
00351
00352 case GEARMAN_COMMAND_OPTION_REQ:
00353
00354 snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
00355 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00356 if (!strcasecmp(option, "exceptions"))
00357 server_con->options|= GEARMAN_SERVER_CON_EXCEPTIONS;
00358 else
00359 {
00360 return _server_error_packet(server_con, "unknown_option",
00361 "Server does not recognize given option");
00362 }
00363
00364 ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00365 GEARMAN_COMMAND_OPTION_RES,
00366 packet->arg[0], packet->arg_size[0],
00367 NULL);
00368 if (ret != GEARMAN_SUCCESS)
00369 return ret;
00370
00371 break;
00372
00373
00374 case GEARMAN_COMMAND_CAN_DO:
00375 if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00376 packet->arg_size[0], 0) == NULL)
00377 {
00378 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00379 }
00380
00381 break;
00382
00383 case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
00384 if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00385 packet->arg_size[0] - 1,
00386 (in_port_t)atoi((char *)(packet->arg[1])))
00387 == NULL)
00388 {
00389 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00390 }
00391
00392 break;
00393
00394 case GEARMAN_COMMAND_CANT_DO:
00395 gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
00396 packet->arg_size[0]);
00397 break;
00398
00399 case GEARMAN_COMMAND_RESET_ABILITIES:
00400 gearman_server_con_free_workers(server_con);
00401 break;
00402
00403 case GEARMAN_COMMAND_PRE_SLEEP:
00404 server_job= gearman_server_job_peek(server_con);
00405 if (server_job == NULL)
00406 server_con->options|= GEARMAN_SERVER_CON_SLEEPING;
00407 else
00408 {
00409
00410
00411 ret= gearman_server_io_packet_add(server_con, false,
00412 GEARMAN_MAGIC_RESPONSE,
00413 GEARMAN_COMMAND_NOOP, NULL);
00414 if (ret != GEARMAN_SUCCESS)
00415 return ret;
00416 }
00417
00418 break;
00419
00420 case GEARMAN_COMMAND_GRAB_JOB:
00421 case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
00422 server_con->options&=
00423 (gearman_server_con_options_t)~(GEARMAN_SERVER_CON_SLEEPING |
00424 GEARMAN_SERVER_CON_NOOP_SENT);
00425
00426 server_job= gearman_server_job_take(server_con);
00427 if (server_job == NULL)
00428 {
00429
00430 ret= gearman_server_io_packet_add(server_con, false,
00431 GEARMAN_MAGIC_RESPONSE,
00432 GEARMAN_COMMAND_NO_JOB, NULL);
00433 }
00434 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
00435 {
00436
00437
00438 ret= gearman_server_io_packet_add(server_con, false,
00439 GEARMAN_MAGIC_RESPONSE,
00440 GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
00441 server_job->job_handle,
00442 (size_t)(strlen(server_job->job_handle) + 1),
00443 server_job->function->function_name,
00444 server_job->function->function_name_size + 1,
00445 server_job->unique,
00446 (size_t)(strlen(server_job->unique) + 1),
00447 server_job->data, server_job->data_size,
00448 NULL);
00449 }
00450 else
00451 {
00452
00453 ret= gearman_server_io_packet_add(server_con, false,
00454 GEARMAN_MAGIC_RESPONSE,
00455 GEARMAN_COMMAND_JOB_ASSIGN,
00456 server_job->job_handle,
00457 (size_t)(strlen(server_job->job_handle) + 1),
00458 server_job->function->function_name,
00459 server_job->function->function_name_size + 1,
00460 server_job->data, server_job->data_size,
00461 NULL);
00462 }
00463
00464 if (ret != GEARMAN_SUCCESS)
00465 {
00466 if (server_job != NULL)
00467 return gearman_server_job_queue(server_job);
00468 return ret;
00469 }
00470
00471 break;
00472
00473 case GEARMAN_COMMAND_WORK_DATA:
00474 case GEARMAN_COMMAND_WORK_WARNING:
00475 server_job= gearman_server_job_get(server_con->thread->server,
00476 (char *)(packet->arg[0]),
00477 server_con);
00478 if (server_job == NULL)
00479 {
00480 return _server_error_packet(server_con, "job_not_found",
00481 "Job given in work result not found");
00482 }
00483
00484
00485 ret= _server_queue_work_data(server_job, packet, packet->command);
00486 if (ret != GEARMAN_SUCCESS)
00487 return ret;
00488
00489 break;
00490
00491 case GEARMAN_COMMAND_WORK_STATUS:
00492 server_job= gearman_server_job_get(server_con->thread->server,
00493 (char *)(packet->arg[0]),
00494 server_con);
00495 if (server_job == NULL)
00496 {
00497 return _server_error_packet(server_con, "job_not_found",
00498 "Job given in work result not found");
00499 }
00500
00501
00502 server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
00503
00504
00505 snprintf(denominator_buffer, 11, "%.*s", (uint32_t)(packet->arg_size[2]),
00506 (char *)(packet->arg[2]));
00507 server_job->denominator= (uint32_t)atoi(denominator_buffer);
00508
00509
00510 for (server_client= server_job->client_list; server_client;
00511 server_client= server_client->job_next)
00512 {
00513 ret= gearman_server_io_packet_add(server_client->con, false,
00514 GEARMAN_MAGIC_RESPONSE,
00515 GEARMAN_COMMAND_WORK_STATUS,
00516 packet->arg[0], packet->arg_size[0],
00517 packet->arg[1], packet->arg_size[1],
00518 packet->arg[2], packet->arg_size[2],
00519 NULL);
00520 if (ret != GEARMAN_SUCCESS)
00521 return ret;
00522 }
00523
00524 break;
00525
00526 case GEARMAN_COMMAND_WORK_COMPLETE:
00527 server_job= gearman_server_job_get(server_con->thread->server,
00528 (char *)(packet->arg[0]),
00529 server_con);
00530 if (server_job == NULL)
00531 {
00532 return _server_error_packet(server_con, "job_not_found",
00533 "Job given in work result not found");
00534 }
00535
00536
00537 ret= _server_queue_work_data(server_job, packet,
00538 GEARMAN_COMMAND_WORK_COMPLETE);
00539 if (ret != GEARMAN_SUCCESS)
00540 return ret;
00541
00542
00543 if (server_job->state & GEARMAN_SERVER_JOB_QUEUED &&
00544 server->queue_done_fn != NULL)
00545 {
00546 ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00547 server_job->unique,
00548 (size_t)strlen(server_job->unique),
00549 server_job->function->function_name,
00550 server_job->function->function_name_size);
00551 if (ret != GEARMAN_SUCCESS)
00552 return ret;
00553 }
00554
00555
00556 gearman_server_job_free(server_job);
00557 break;
00558
00559 case GEARMAN_COMMAND_WORK_EXCEPTION:
00560 server_job= gearman_server_job_get(server_con->thread->server,
00561 (char *)(packet->arg[0]),
00562 server_con);
00563 if (server_job == NULL)
00564 {
00565 return _server_error_packet(server_con, "job_not_found",
00566 "Job given in work result not found");
00567 }
00568
00569
00570 ret= _server_queue_work_data(server_job, packet,
00571 GEARMAN_COMMAND_WORK_EXCEPTION);
00572 if (ret != GEARMAN_SUCCESS)
00573 return ret;
00574 break;
00575
00576 case GEARMAN_COMMAND_WORK_FAIL:
00577
00578 snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00579 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00580
00581 server_job= gearman_server_job_get(server_con->thread->server, job_handle,
00582 server_con);
00583 if (server_job == NULL)
00584 {
00585 return _server_error_packet(server_con, "job_not_found",
00586 "Job given in work result not found");
00587 }
00588
00589
00590 for (server_client= server_job->client_list; server_client;
00591 server_client= server_client->job_next)
00592 {
00593 ret= gearman_server_io_packet_add(server_client->con, false,
00594 GEARMAN_MAGIC_RESPONSE,
00595 GEARMAN_COMMAND_WORK_FAIL,
00596 packet->arg[0], packet->arg_size[0],
00597 NULL);
00598 if (ret != GEARMAN_SUCCESS)
00599 return ret;
00600 }
00601
00602
00603 if (server_job->state & GEARMAN_SERVER_JOB_QUEUED &&
00604 server->queue_done_fn != NULL)
00605 {
00606 ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00607 server_job->unique,
00608 (size_t)strlen(server_job->unique),
00609 server_job->function->function_name,
00610 server_job->function->function_name_size);
00611 if (ret != GEARMAN_SUCCESS)
00612 return ret;
00613 }
00614
00615
00616 gearman_server_job_free(server_job);
00617 break;
00618
00619 case GEARMAN_COMMAND_SET_CLIENT_ID:
00620 gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
00621 packet->arg_size[0]);
00622 break;
00623
00624 case GEARMAN_COMMAND_TEXT:
00625 return _server_run_text(server_con, packet);
00626
00627 case GEARMAN_COMMAND_UNUSED:
00628 case GEARMAN_COMMAND_NOOP:
00629 case GEARMAN_COMMAND_JOB_CREATED:
00630 case GEARMAN_COMMAND_NO_JOB:
00631 case GEARMAN_COMMAND_JOB_ASSIGN:
00632 case GEARMAN_COMMAND_ECHO_RES:
00633 case GEARMAN_COMMAND_ERROR:
00634 case GEARMAN_COMMAND_STATUS_RES:
00635 case GEARMAN_COMMAND_ALL_YOURS:
00636 case GEARMAN_COMMAND_OPTION_RES:
00637 case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
00638 case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
00639 case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
00640 case GEARMAN_COMMAND_MAX:
00641 default:
00642 return _server_error_packet(server_con, "bad_command",
00643 "Command not expected");
00644 }
00645
00646 return GEARMAN_SUCCESS;
00647 }
00648
00649 gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
00650 {
00651 server->shutdown_graceful= true;
00652
00653 if (server->job_count == 0)
00654 return GEARMAN_SHUTDOWN;
00655
00656 return GEARMAN_SHUTDOWN_GRACEFUL;
00657 }
00658
00659 gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
00660 {
00661 gearman_return_t ret;
00662
00663 if (server->queue_replay_fn == NULL)
00664 return GEARMAN_SUCCESS;
00665
00666 server->state.queue_startup= true;
00667
00668 ret= (*(server->queue_replay_fn))(server, (void *)server->queue_context,
00669 _queue_replay_add, server);
00670
00671 server->state.queue_startup= false;
00672
00673 return ret;
00674 }
00675
00676 void *gearman_server_queue_context(const gearman_server_st *server)
00677 {
00678 return (void *)server->queue_context;
00679 }
00680
00681 void gearman_server_set_queue_context(gearman_server_st *server,
00682 void *context)
00683 {
00684 server->queue_context= context;
00685 }
00686
00687 void gearman_server_set_queue_add_fn(gearman_server_st *server,
00688 gearman_queue_add_fn *function)
00689 {
00690 server->queue_add_fn= function;
00691 }
00692
00693 void gearman_server_set_queue_flush_fn(gearman_server_st *server,
00694 gearman_queue_flush_fn *function)
00695 {
00696 server->queue_flush_fn= function;
00697 }
00698
00699 void gearman_server_set_queue_done_fn(gearman_server_st *server,
00700 gearman_queue_done_fn *function)
00701 {
00702 server->queue_done_fn= function;
00703 }
00704
00705 void gearman_server_set_queue_replay_fn(gearman_server_st *server,
00706 gearman_queue_replay_fn *function)
00707 {
00708 server->queue_replay_fn= function;
00709 }
00710
00711
00712
00713
00714
00715 gearman_return_t _queue_replay_add(gearman_server_st *server,
00716 void *context __attribute__ ((unused)),
00717 const void *unique, size_t unique_size,
00718 const void *function_name,
00719 size_t function_name_size, const void *data,
00720 size_t data_size,
00721 gearman_job_priority_t priority)
00722 {
00723 gearman_return_t ret;
00724
00725 (void)gearman_server_job_add(server, (char *)function_name,
00726 function_name_size, (char *)unique, unique_size,
00727 data, data_size, priority, NULL, &ret);
00728 return ret;
00729 }
00730
00731 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00732 const char *error_code,
00733 const char *error_string)
00734 {
00735 return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00736 GEARMAN_COMMAND_ERROR, error_code,
00737 (size_t)(strlen(error_code) + 1),
00738 error_string,
00739 (size_t)strlen(error_string), NULL);
00740 }
00741
00742 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00743 gearman_packet_st *packet)
00744 {
00745 char *data;
00746 char *new_data;
00747 size_t size;
00748 size_t total;
00749 int max_queue_size;
00750 gearman_server_thread_st *thread;
00751 gearman_server_con_st *con;
00752 gearman_server_worker_st *worker;
00753 gearman_server_function_st *function;
00754 gearman_server_packet_st *server_packet;
00755
00756 data= malloc(GEARMAN_TEXT_RESPONSE_SIZE);
00757 if (data == NULL)
00758 {
00759 gearman_log_error(packet->universal, "_server_run_text", "malloc");
00760 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00761 }
00762 total= GEARMAN_TEXT_RESPONSE_SIZE;
00763
00764 if (packet->argc == 0)
00765 {
00766 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00767 "ERR unknown_command Unknown+server+command\n");
00768 }
00769 else if (!strcasecmp("workers", (char *)(packet->arg[0])))
00770 {
00771 size= 0;
00772
00773 for (thread= server_con->thread->server->thread_list; thread != NULL;
00774 thread= thread->next)
00775 {
00776 (void) pthread_mutex_lock(&thread->lock);
00777
00778 for (con= thread->con_list; con != NULL; con= con->next)
00779 {
00780 if (con->host == NULL)
00781 continue;
00782
00783 if (size > total)
00784 size= total;
00785
00786
00787 if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00788 {
00789 new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00790 if (new_data == NULL)
00791 {
00792 (void) pthread_mutex_unlock(&thread->lock);
00793 free(data);
00794 gearman_log_error(packet->universal, "_server_run_text", "malloc");
00795 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00796 }
00797
00798 data= new_data;
00799 total+= GEARMAN_TEXT_RESPONSE_SIZE;
00800 }
00801
00802 size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
00803 con->con.fd, con->host, con->id);
00804 if (size > total)
00805 continue;
00806
00807 for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
00808 {
00809 size+= (size_t)snprintf(data + size, total - size, " %.*s",
00810 (int)(worker->function->function_name_size),
00811 worker->function->function_name);
00812 if (size > total)
00813 break;
00814 }
00815
00816 if (size > total)
00817 continue;
00818
00819 size+= (size_t)snprintf(data + size, total - size, "\n");
00820 }
00821
00822 (void) pthread_mutex_unlock(&thread->lock);
00823 }
00824
00825 if (size < total)
00826 snprintf(data + size, total - size, ".\n");
00827 }
00828 else if (!strcasecmp("status", (char *)(packet->arg[0])))
00829 {
00830 size= 0;
00831
00832 for (function= server_con->thread->server->function_list; function != NULL;
00833 function= function->next)
00834 {
00835 if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00836 {
00837 new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00838 if (new_data == NULL)
00839 {
00840 free(data);
00841 gearman_log_error(packet->universal, "_server_run_text", "malloc");
00842 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00843 }
00844
00845 data= new_data;
00846 total+= GEARMAN_TEXT_RESPONSE_SIZE;
00847 }
00848
00849 size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
00850 (int)(function->function_name_size),
00851 function->function_name, function->job_total,
00852 function->job_running, function->worker_count);
00853 if (size > total)
00854 size= total;
00855 }
00856
00857 if (size < total)
00858 snprintf(data + size, total - size, ".\n");
00859 }
00860 else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
00861 {
00862 if (packet->argc == 1)
00863 {
00864 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
00865 "An+incomplete+set+of+arguments+was+sent+to+this+command\n");
00866 }
00867 else
00868 {
00869 if (packet->argc == 2)
00870 max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
00871 else
00872 {
00873 max_queue_size= atoi((char *)(packet->arg[2]));
00874 if (max_queue_size < 0)
00875 max_queue_size= 0;
00876 }
00877
00878 for (function= server_con->thread->server->function_list;
00879 function != NULL; function= function->next)
00880 {
00881 if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
00882 !memcmp(packet->arg[1], function->function_name,
00883 function->function_name_size))
00884 {
00885 function->max_queue_size= (uint32_t)max_queue_size;
00886 }
00887 }
00888
00889 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00890 }
00891 }
00892 else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
00893 {
00894 if (packet->argc == 1)
00895 {
00896 server_con->thread->server->shutdown= true;
00897 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00898 }
00899 else if (packet->argc == 2 &&
00900 !strcasecmp("graceful", (char *)(packet->arg[1])))
00901 {
00902 server_con->thread->server->shutdown_graceful= true;
00903 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00904 }
00905 else
00906 {
00907 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00908 "ERR unknown_args Unknown+arguments+to+server+command\n");
00909 }
00910 }
00911 else if (!strcasecmp("version", (char *)(packet->arg[0])))
00912 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
00913 else
00914 {
00915 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00916 "ERR unknown_command Unknown+server+command\n");
00917 }
00918
00919 server_packet= gearman_server_packet_create(server_con->thread, false);
00920 if (server_packet == NULL)
00921 {
00922 free(data);
00923 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00924 }
00925
00926 if (gearman_packet_create(server_con->thread->gearman,
00927 &(server_packet->packet)) == NULL)
00928 {
00929 free(data);
00930 gearman_server_packet_free(server_packet, server_con->thread, false);
00931 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00932 }
00933
00934 server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
00935 server_packet->packet.command= GEARMAN_COMMAND_TEXT;
00936 server_packet->packet.options.complete= true;
00937 server_packet->packet.options.free_data= true;
00938
00939 server_packet->packet.data= data;
00940 server_packet->packet.data_size= strlen(data);
00941
00942 (void) pthread_mutex_lock(&server_con->thread->lock);
00943 GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
00944 (void) pthread_mutex_unlock(&server_con->thread->lock);
00945
00946 gearman_server_con_io_add(server_con);
00947
00948 return GEARMAN_SUCCESS;
00949 }
00950
00951 static gearman_return_t
00952 _server_queue_work_data(gearman_server_job_st *server_job,
00953 gearman_packet_st *packet, gearman_command_t command)
00954 {
00955 gearman_server_client_st *server_client;
00956 uint8_t *data;
00957 gearman_return_t ret;
00958
00959 for (server_client= server_job->client_list; server_client;
00960 server_client= server_client->job_next)
00961 {
00962 if (command == GEARMAN_COMMAND_WORK_EXCEPTION &&
00963 !(server_client->con->options & GEARMAN_SERVER_CON_EXCEPTIONS))
00964 {
00965 continue;
00966 }
00967
00968 if (packet->data_size > 0)
00969 {
00970 if (packet->options.free_data &&
00971 server_client->job_next == NULL)
00972 {
00973 data= (uint8_t *)(packet->data);
00974 packet->options.free_data= false;
00975 }
00976 else
00977 {
00978 data= malloc(packet->data_size);
00979 if (data == NULL)
00980 {
00981 gearman_log_error(packet->universal, "_server_run_command", "malloc");
00982 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00983 }
00984
00985 memcpy(data, packet->data, packet->data_size);
00986 }
00987 }
00988 else
00989 data= NULL;
00990
00991 ret= gearman_server_io_packet_add(server_client->con, true,
00992 GEARMAN_MAGIC_RESPONSE, command,
00993 packet->arg[0], packet->arg_size[0],
00994 data, packet->data_size, NULL);
00995 if (ret != GEARMAN_SUCCESS)
00996 return ret;
00997 }
00998
00999 return GEARMAN_SUCCESS;
01000 }
01001
01002 static void _log(const char *line, gearman_verbose_t verbose, void *context)
01003 {
01004 gearman_server_st *server= (gearman_server_st *)context;
01005 (*(server->log_fn))(line, verbose, (void *)server->log_context);
01006 }