00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00025 static gearman_return_t _job_send(gearman_job_st *job);
00026
00029
00030
00031
00032
00033 gearman_job_st *gearman_job_create(gearman_worker_st *worker,
00034 gearman_job_st *job)
00035 {
00036 if (job == NULL)
00037 {
00038 job= malloc(sizeof(gearman_job_st));
00039 if (job == NULL)
00040 {
00041 gearman_universal_set_error((&worker->universal), "_job_create", "malloc");
00042 return NULL;
00043 }
00044
00045 job->options.allocated= true;
00046 }
00047 else
00048 {
00049 job->options.allocated= false;
00050 }
00051
00052 job->options.assigned_in_use= false;
00053 job->options.work_in_use= false;
00054 job->options.finished= false;
00055
00056 job->worker= worker;
00057
00058 if (worker->job_list != NULL)
00059 worker->job_list->prev= job;
00060 job->next= worker->job_list;
00061 job->prev= NULL;
00062 worker->job_list= job;
00063 worker->job_count++;
00064
00065 job->con= NULL;
00066
00067 return job;
00068 }
00069
00070
00071 gearman_return_t gearman_job_send_data(gearman_job_st *job, const void *data,
00072 size_t data_size)
00073 {
00074 gearman_return_t ret;
00075 const void *args[2];
00076 size_t args_size[2];
00077
00078 if (! (job->options.work_in_use))
00079 {
00080 args[0]= job->assigned.arg[0];
00081 args_size[0]= job->assigned.arg_size[0];
00082 args[1]= data;
00083 args_size[1]= data_size;
00084 ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00085 GEARMAN_MAGIC_REQUEST,
00086 GEARMAN_COMMAND_WORK_DATA,
00087 args, args_size, 2);
00088 if (ret != GEARMAN_SUCCESS)
00089 return ret;
00090
00091 job->options.work_in_use= true;
00092 }
00093
00094 return _job_send(job);
00095 }
00096
00097 gearman_return_t gearman_job_send_warning(gearman_job_st *job,
00098 const void *warning,
00099 size_t warning_size)
00100 {
00101 gearman_return_t ret;
00102 const void *args[2];
00103 size_t args_size[2];
00104
00105 if (! (job->options.work_in_use))
00106 {
00107 args[0]= job->assigned.arg[0];
00108 args_size[0]= job->assigned.arg_size[0];
00109 args[1]= warning;
00110 args_size[1]= warning_size;
00111 ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00112 GEARMAN_MAGIC_REQUEST,
00113 GEARMAN_COMMAND_WORK_WARNING,
00114 args, args_size, 2);
00115 if (ret != GEARMAN_SUCCESS)
00116 return ret;
00117
00118 job->options.work_in_use= true;
00119 }
00120
00121 return _job_send(job);
00122 }
00123
00124 gearman_return_t gearman_job_send_status(gearman_job_st *job,
00125 uint32_t numerator,
00126 uint32_t denominator)
00127 {
00128 gearman_return_t ret;
00129 char numerator_string[12];
00130 char denominator_string[12];
00131 const void *args[3];
00132 size_t args_size[3];
00133
00134 if (! (job->options.work_in_use))
00135 {
00136 snprintf(numerator_string, 12, "%u", numerator);
00137 snprintf(denominator_string, 12, "%u", denominator);
00138
00139 args[0]= job->assigned.arg[0];
00140 args_size[0]= job->assigned.arg_size[0];
00141 args[1]= numerator_string;
00142 args_size[1]= strlen(numerator_string) + 1;
00143 args[2]= denominator_string;
00144 args_size[2]= strlen(denominator_string);
00145 ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00146 GEARMAN_MAGIC_REQUEST,
00147 GEARMAN_COMMAND_WORK_STATUS,
00148 args, args_size, 3);
00149 if (ret != GEARMAN_SUCCESS)
00150 return ret;
00151
00152 job->options.work_in_use= true;
00153 }
00154
00155 return _job_send(job);
00156 }
00157
00158 gearman_return_t gearman_job_send_complete(gearman_job_st *job,
00159 const void *result,
00160 size_t result_size)
00161 {
00162 gearman_return_t ret;
00163 const void *args[2];
00164 size_t args_size[2];
00165
00166 if (job->options.finished)
00167 return GEARMAN_SUCCESS;
00168
00169 if (! (job->options.work_in_use))
00170 {
00171 args[0]= job->assigned.arg[0];
00172 args_size[0]= job->assigned.arg_size[0];
00173 args[1]= result;
00174 args_size[1]= result_size;
00175 ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00176 GEARMAN_MAGIC_REQUEST,
00177 GEARMAN_COMMAND_WORK_COMPLETE,
00178 args, args_size, 2);
00179 if (ret != GEARMAN_SUCCESS)
00180 return ret;
00181
00182 job->options.work_in_use= true;
00183 }
00184
00185 ret= _job_send(job);
00186 if (ret != GEARMAN_SUCCESS)
00187 return ret;
00188
00189 job->options.finished= true;
00190
00191 return GEARMAN_SUCCESS;
00192 }
00193
00194 gearman_return_t gearman_job_send_exception(gearman_job_st *job,
00195 const void *exception,
00196 size_t exception_size)
00197 {
00198 gearman_return_t ret;
00199 const void *args[2];
00200 size_t args_size[2];
00201
00202 if (! (job->options.work_in_use))
00203 {
00204 args[0]= job->assigned.arg[0];
00205 args_size[0]= job->assigned.arg_size[0];
00206 args[1]= exception;
00207 args_size[1]= exception_size;
00208 ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00209 GEARMAN_MAGIC_REQUEST,
00210 GEARMAN_COMMAND_WORK_EXCEPTION,
00211 args, args_size, 2);
00212 if (ret != GEARMAN_SUCCESS)
00213 return ret;
00214
00215 job->options.work_in_use= true;
00216 }
00217
00218 return _job_send(job);
00219 }
00220
00221 gearman_return_t gearman_job_send_fail(gearman_job_st *job)
00222 {
00223 gearman_return_t ret;
00224 const void *args[1];
00225 size_t args_size[1];
00226
00227 if (job->options.finished)
00228 return GEARMAN_SUCCESS;
00229
00230 if (! (job->options.work_in_use))
00231 {
00232 args[0]= job->assigned.arg[0];
00233 args_size[0]= job->assigned.arg_size[0] - 1;
00234 ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00235 GEARMAN_MAGIC_REQUEST,
00236 GEARMAN_COMMAND_WORK_FAIL,
00237 args, args_size, 1);
00238 if (ret != GEARMAN_SUCCESS)
00239 return ret;
00240
00241 job->options.work_in_use= true;
00242 }
00243
00244 ret= _job_send(job);
00245 if (ret != GEARMAN_SUCCESS)
00246 return ret;
00247
00248 job->options.finished= true;
00249 return GEARMAN_SUCCESS;
00250 }
00251
00252 const char *gearman_job_handle(const gearman_job_st *job)
00253 {
00254 return (const char *)job->assigned.arg[0];
00255 }
00256
00257 const char *gearman_job_function_name(const gearman_job_st *job)
00258 {
00259 return (const char *)job->assigned.arg[1];
00260 }
00261
00262 const char *gearman_job_unique(const gearman_job_st *job)
00263 {
00264 if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
00265 return (const char *)job->assigned.arg[2];
00266 return "";
00267 }
00268
00269 const void *gearman_job_workload(const gearman_job_st *job)
00270 {
00271 return job->assigned.data;
00272 }
00273
00274 size_t gearman_job_workload_size(const gearman_job_st *job)
00275 {
00276 return job->assigned.data_size;
00277 }
00278
00279 void *gearman_job_take_workload(gearman_job_st *job, size_t *data_size)
00280 {
00281 return gearman_packet_take_data(&(job->assigned), data_size);
00282 }
00283
00284
00285
00286
00287
00288 static gearman_return_t _job_send(gearman_job_st *job)
00289 {
00290 gearman_return_t ret;
00291
00292 ret= gearman_connection_send(job->con, &(job->work), true);
00293 if (ret != GEARMAN_SUCCESS)
00294 return ret;
00295
00296 gearman_packet_free(&(job->work));
00297 job->options.work_in_use= false;
00298
00299 return GEARMAN_SUCCESS;
00300 }