00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libpq.h>
00017
00018 #if defined(HAVE_LIBPQ_FE_H)
00019 # include <libpq-fe.h>
00020 # include <pg_config_manual.h>
00021 #else
00022 # include <postgresql/libpq-fe.h>
00023 # include <postgresql/pg_config_manual.h>
00024 #endif
00025
00035 #define GEARMAN_QUEUE_LIBPQ_DEFAULT_TABLE "queue"
00036 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00037
00041 typedef struct
00042 {
00043 PGconn *con;
00044 char table[NAMEDATALEN];
00045 char *query;
00046 size_t query_size;
00047 } gearman_queue_libpq_st;
00048
00052 static void _libpq_notice_processor(void *arg, const char *message);
00053
00054
00055 static gearman_return_t _libpq_add(gearman_server_st *server, void *context,
00056 const void *unique, size_t unique_size,
00057 const void *function_name,
00058 size_t function_name_size,
00059 const void *data, size_t data_size,
00060 gearman_job_priority_t priority);
00061 static gearman_return_t _libpq_flush(gearman_server_st *server, void *context);
00062 static gearman_return_t _libpq_done(gearman_server_st *server, void *context,
00063 const void *unique,
00064 size_t unique_size,
00065 const void *function_name,
00066 size_t function_name_size);
00067 static gearman_return_t _libpq_replay(gearman_server_st *server, void *context,
00068 gearman_queue_add_fn *add_fn,
00069 void *add_context);
00070
00073
00074
00075
00076
00077 gearman_return_t gearman_server_queue_libpq_conf(gearman_conf_st *conf)
00078 {
00079 gearman_conf_module_st *module;
00080
00081 module= gearman_conf_module_create(conf, NULL, "libpq");
00082 if (module == NULL)
00083 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00084
00085 #define MCO(__name, __value, __help) \
00086 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00087
00088 MCO("conninfo", "STRING", "PostgreSQL connection information string.")
00089 MCO("table", "TABLE", "Table to use.")
00090
00091 return gearman_conf_return(conf);
00092 }
00093
00094 gearman_return_t gearman_server_queue_libpq_init(gearman_server_st *server,
00095 gearman_conf_st *conf)
00096 {
00097 gearman_queue_libpq_st *queue;
00098 gearman_conf_module_st *module;
00099 const char *name;
00100 const char *value;
00101 const char *conninfo= "";
00102 char create[1024];
00103 PGresult *result;
00104
00105 gearman_log_info(server->gearman, "Initializing libpq module");
00106
00107 queue= malloc(sizeof(gearman_queue_libpq_st));
00108 if (queue == NULL)
00109 {
00110 gearman_log_error(server->gearman, "gearman_queue_libpq_init", "malloc");
00111 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00112 }
00113
00114 memset(queue, 0, sizeof(gearman_queue_libpq_st));
00115 snprintf(queue->table, NAMEDATALEN, GEARMAN_QUEUE_LIBPQ_DEFAULT_TABLE);
00116
00117 gearman_server_set_queue_context(server, queue);
00118
00119
00120 module= gearman_conf_module_find(conf, "libpq");
00121 if (module == NULL)
00122 {
00123 gearman_log_error(server->gearman, "gearman_queue_libpq_init", "gearman_conf_module_find:NULL");
00124 return GEARMAN_QUEUE_ERROR;
00125 }
00126
00127 while (gearman_conf_module_value(module, &name, &value))
00128 {
00129 if (!strcmp(name, "conninfo"))
00130 conninfo= value;
00131 else if (!strcmp(name, "table"))
00132 snprintf(queue->table, NAMEDATALEN, "%s", value);
00133 else
00134 {
00135 gearman_server_queue_libpq_deinit(server);
00136 gearman_log_error(server->gearman, "gearman_queue_libpq_init", "Unknown argument: %s", name);
00137 return GEARMAN_QUEUE_ERROR;
00138 }
00139 }
00140
00141 queue->con= PQconnectdb(conninfo);
00142
00143 if (queue->con == NULL || PQstatus(queue->con) != CONNECTION_OK)
00144 {
00145 gearman_log_error(server->gearman, "gearman_queue_libpq_init",
00146 "PQconnectdb: %s", PQerrorMessage(queue->con));
00147 gearman_server_queue_libpq_deinit(server);
00148 return GEARMAN_QUEUE_ERROR;
00149 }
00150
00151 (void)PQsetNoticeProcessor(queue->con, _libpq_notice_processor, server);
00152
00153 snprintf(create, 1024, "SELECT tablename FROM pg_tables WHERE tablename='%s'",
00154 queue->table);
00155
00156 result= PQexec(queue->con, create);
00157 if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
00158 {
00159 gearman_log_error(server->gearman, "gearman_queue_libpq_init", "PQexec:%s",
00160 PQerrorMessage(queue->con));
00161 PQclear(result);
00162 gearman_server_queue_libpq_deinit(server);
00163 return GEARMAN_QUEUE_ERROR;
00164 }
00165
00166 if (PQntuples(result) == 0)
00167 {
00168 PQclear(result);
00169
00170 snprintf(create, 1024,
00171 "CREATE TABLE %s"
00172 "("
00173 "unique_key VARCHAR(%d) PRIMARY KEY,"
00174 "function_name VARCHAR(255),"
00175 "priority INTEGER,"
00176 "data BYTEA"
00177 ")",
00178 queue->table, GEARMAN_UNIQUE_SIZE);
00179
00180 gearman_log_info(server->gearman, "libpq module creating table '%s'", queue->table);
00181
00182 result= PQexec(queue->con, create);
00183 if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00184 {
00185 gearman_log_error(server->gearman, "gearman_queue_libpq_init", "PQexec:%s",
00186 PQerrorMessage(queue->con));
00187 PQclear(result);
00188 gearman_server_queue_libpq_deinit(server);
00189 return GEARMAN_QUEUE_ERROR;
00190 }
00191
00192 PQclear(result);
00193 }
00194 else
00195 PQclear(result);
00196
00197 gearman_server_set_queue_add_fn(server, _libpq_add);
00198 gearman_server_set_queue_flush_fn(server, _libpq_flush);
00199 gearman_server_set_queue_done_fn(server, _libpq_done);
00200 gearman_server_set_queue_replay_fn(server, _libpq_replay);
00201
00202 return GEARMAN_SUCCESS;
00203 }
00204
00205 gearman_return_t gearman_server_queue_libpq_deinit(gearman_server_st *server)
00206 {
00207 gearman_queue_libpq_st *queue;
00208
00209 gearman_log_info(server->gearman, "Shutting down libpq queue module");
00210
00211 queue= (gearman_queue_libpq_st *)gearman_server_queue_context(server);
00212 gearman_server_set_queue_context(server, NULL);
00213
00214 if (queue->con != NULL)
00215 PQfinish(queue->con);
00216
00217 if (queue->query != NULL)
00218 free(queue->query);
00219
00220 free(queue);
00221
00222 return GEARMAN_SUCCESS;
00223 }
00224
00225 gearman_return_t gearmand_queue_libpq_init(gearmand_st *gearmand,
00226 gearman_conf_st *conf)
00227 {
00228 return gearman_server_queue_libpq_init(&(gearmand->server), conf);
00229 }
00230
00231 gearman_return_t gearmand_queue_libpq_deinit(gearmand_st *gearmand)
00232 {
00233 return gearman_server_queue_libpq_deinit(&(gearmand->server));
00234 }
00235
00236
00237
00238
00239
00240 static void _libpq_notice_processor(void *arg, const char *message)
00241 {
00242 gearman_server_st *server= (gearman_server_st *)arg;
00243 gearman_log_info(server->gearman, "PostgreSQL %s", message);
00244 }
00245
00246 static gearman_return_t _libpq_add(gearman_server_st *server, void *context,
00247 const void *unique, size_t unique_size,
00248 const void *function_name,
00249 size_t function_name_size,
00250 const void *data, size_t data_size,
00251 gearman_job_priority_t priority)
00252 {
00253 gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00254 char *query;
00255 size_t query_size;
00256 PGresult *result;
00257
00258 const char *param_values[3]= { (char *)unique,
00259 (char *)function_name,
00260 (char *)data };
00261 int param_lengths[3]= { (int)unique_size,
00262 (int)function_name_size,
00263 (int)data_size };
00264 int param_formats[3]= { 0, 0, 1 };
00265
00266 gearman_log_debug(server->gearman, "libpq add: %.*s", (uint32_t)unique_size, (char *)unique);
00267
00268
00269
00270 #if 0
00271 if (!not started)
00272 {
00273 if (_query(pq, "BEGIN", 5) != PQ_RETURN_OK)
00274 return REPQ_RETURN_EXTERNAL;
00275
00276 pq_result_free(&(pq->result));
00277 }
00278 #endif
00279
00280 query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00281 if (query_size > queue->query_size)
00282 {
00283 query= realloc(queue->query, query_size);
00284 if (query == NULL)
00285 {
00286 gearman_log_error(server->gearman, "_libpq_add", "realloc");
00287 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00288 }
00289
00290 queue->query= query;
00291 queue->query_size= query_size;
00292 }
00293 else
00294 query= queue->query;
00295
00296 (void)snprintf(query, query_size,
00297 "INSERT INTO %s (priority,unique_key,function_name,data) "
00298 "VALUES(%u,$1,$2,$3)", queue->table, (uint32_t)priority);
00299 result= PQexecParams(queue->con, query, 3, NULL, param_values, param_lengths,
00300 param_formats, 0);
00301 if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00302 {
00303 gearman_log_error(server->gearman, "_libpq_command", "PQexec:%s", PQerrorMessage(queue->con));
00304 PQclear(result);
00305 return GEARMAN_QUEUE_ERROR;
00306 }
00307
00308 PQclear(result);
00309
00310 return GEARMAN_SUCCESS;
00311 }
00312
00313 static gearman_return_t _libpq_flush(gearman_server_st *server,
00314 void *context __attribute__((unused)))
00315 {
00316 gearman_log_debug(server->gearman, "libpq flush");
00317
00318 return GEARMAN_SUCCESS;
00319 }
00320
00321 static gearman_return_t _libpq_done(gearman_server_st *server, void *context,
00322 const void *unique,
00323 size_t unique_size,
00324 const void *function_name __attribute__((unused)),
00325 size_t function_name_size __attribute__((unused)))
00326 {
00327 gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00328 char *query;
00329 size_t query_size;
00330 PGresult *result;
00331
00332 gearman_log_debug(server->gearman, "libpq done: %.*s", (uint32_t)unique_size, (char *)unique);
00333
00334 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00335 if (query_size > queue->query_size)
00336 {
00337 query= realloc(queue->query, query_size);
00338 if (query == NULL)
00339 {
00340 gearman_log_error(server->gearman, "_libpq_add", "realloc");
00341 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00342 }
00343
00344 queue->query= query;
00345 queue->query_size= query_size;
00346 }
00347 else
00348 query= queue->query;
00349
00350 query_size= (size_t)snprintf(query, query_size,
00351 "DELETE FROM %s WHERE unique_key='",
00352 queue->table);
00353
00354 query_size+= PQescapeStringConn(queue->con, query + query_size, unique,
00355 unique_size, NULL);
00356 memcpy(query + query_size, "'", 1);
00357 query_size+= 1;
00358 query[query_size]= 0;
00359
00360 result= PQexec(queue->con, query);
00361 if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00362 {
00363 gearman_log_error(server->gearman, "_libpq_add", "PQexec:%s", PQerrorMessage(queue->con));
00364 PQclear(result);
00365 return GEARMAN_QUEUE_ERROR;
00366 }
00367
00368 PQclear(result);
00369
00370 return GEARMAN_SUCCESS;
00371 }
00372
00373 static gearman_return_t _libpq_replay(gearman_server_st *server, void *context,
00374 gearman_queue_add_fn *add_fn,
00375 void *add_context)
00376 {
00377 gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00378 char *query;
00379 gearman_return_t ret;
00380 PGresult *result;
00381 int row;
00382 void *data;
00383
00384 gearman_log_info(server->gearman, "libpq replay start");
00385
00386 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00387 {
00388 query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00389 if (query == NULL)
00390 {
00391 gearman_log_error(server->gearman, "_libpq_replay", "realloc");
00392 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00393 }
00394
00395 queue->query= query;
00396 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00397 }
00398 else
00399 query= queue->query;
00400
00401 (void)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00402 "SELECT unique_key,function_name,priority,data FROM %s",
00403 queue->table);
00404
00405 result= PQexecParams(queue->con, query, 0, NULL, NULL, NULL, NULL, 1);
00406 if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
00407 {
00408 gearman_log_error(server->gearman, "_libpq_replay", "PQexecParams:%s", PQerrorMessage(queue->con));
00409 PQclear(result);
00410 return GEARMAN_QUEUE_ERROR;
00411 }
00412
00413 for (row= 0; row < PQntuples(result); row++)
00414 {
00415 gearman_log_debug(server->gearman, "libpq replay: %.*s",
00416 PQgetlength(result, row, 0),
00417 PQgetvalue(result, row, 0));
00418
00419 if (PQgetlength(result, row, 3) == 0)
00420 data= NULL;
00421 else
00422 {
00423 data= malloc((size_t)PQgetlength(result, row, 3));
00424 if (query == NULL)
00425 {
00426 PQclear(result);
00427 gearman_log_error(server->gearman, "_libpq_replay", "malloc");
00428 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00429 }
00430
00431 memcpy(data, PQgetvalue(result, row, 3),
00432 (size_t)PQgetlength(result, row, 3));
00433 }
00434
00435 ret= (*add_fn)(server, add_context, PQgetvalue(result, row, 0),
00436 (size_t)PQgetlength(result, row, 0),
00437 PQgetvalue(result, row, 1),
00438 (size_t)PQgetlength(result, row, 1),
00439 data, (size_t)PQgetlength(result, row, 3),
00440 atoi(PQgetvalue(result, row, 2)));
00441 if (ret != GEARMAN_SUCCESS)
00442 {
00443 PQclear(result);
00444 return ret;
00445 }
00446 }
00447
00448 PQclear(result);
00449
00450 return GEARMAN_SUCCESS;
00451 }