00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libsqlite3.h>
00017 #include <sqlite3.h>
00018
00028 #define GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE "gearman_queue"
00029 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00030 #define SQLITE_MAX_TABLE_SIZE 256
00031 #define SQLITE_MAX_CREATE_TABLE_SIZE 1024
00032
00036 typedef struct
00037 {
00038 sqlite3* db;
00039 char table[SQLITE_MAX_TABLE_SIZE];
00040 char *query;
00041 size_t query_size;
00042 int in_trans;
00043 } gearman_queue_sqlite_st;
00044
00048 static int _sqlite_query(gearman_server_st *server,
00049 gearman_queue_sqlite_st *queue,
00050 const char *query, size_t query_size,
00051 sqlite3_stmt ** sth);
00052 static int _sqlite_lock(gearman_server_st *server,
00053 gearman_queue_sqlite_st *queue);
00054 static int _sqlite_commit(gearman_server_st *server,
00055 gearman_queue_sqlite_st *queue);
00056 static int _sqlite_rollback(gearman_server_st *server,
00057 gearman_queue_sqlite_st *queue);
00058
00059
00060 static gearman_return_t _sqlite_add(gearman_server_st *server, void *context,
00061 const void *unique, size_t unique_size,
00062 const void *function_name,
00063 size_t function_name_size,
00064 const void *data, size_t data_size,
00065 gearman_job_priority_t priority);
00066 static gearman_return_t _sqlite_flush(gearman_server_st *server, void *context);
00067 static gearman_return_t _sqlite_done(gearman_server_st *server, void *context,
00068 const void *unique,
00069 size_t unique_size,
00070 const void *function_name,
00071 size_t function_name_size);
00072 static gearman_return_t _sqlite_replay(gearman_server_st *server, void *context,
00073 gearman_queue_add_fn *add_fn,
00074 void *add_context);
00075
00078
00079
00080
00081
00082 gearman_return_t gearman_server_queue_libsqlite3_conf(gearman_conf_st *conf)
00083 {
00084 gearman_conf_module_st *module;
00085
00086 module= gearman_conf_module_create(conf, NULL, "libsqlite3");
00087 if (module == NULL)
00088 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00089
00090 #define MCO(__name, __value, __help) \
00091 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00092
00093 MCO("db", "DB", "Database file to use.")
00094 MCO("table", "TABLE", "Table to use.")
00095
00096 return gearman_conf_return(conf);
00097 }
00098
00099 gearman_return_t gearman_server_queue_libsqlite3_init(gearman_server_st *server,
00100 gearman_conf_st *conf)
00101 {
00102 gearman_queue_sqlite_st *queue;
00103 gearman_conf_module_st *module;
00104 const char *name;
00105 const char *value;
00106 char *table= NULL;
00107 const char *query;
00108 sqlite3_stmt* sth;
00109 char create[SQLITE_MAX_CREATE_TABLE_SIZE];
00110
00111 gearman_log_info(server->gearman, "Initializing libsqlite3 module");
00112
00113 queue= calloc(1, sizeof(gearman_queue_sqlite_st));
00114 if (queue == NULL)
00115 {
00116 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init", "malloc");
00117 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00118 }
00119
00120 snprintf(queue->table, SQLITE_MAX_TABLE_SIZE,
00121 GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE);
00122
00123
00124 module= gearman_conf_module_find(conf, "libsqlite3");
00125 if (module == NULL)
00126 {
00127 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00128 "gearman_conf_module_find:NULL");
00129 free(queue);
00130
00131 return GEARMAN_QUEUE_ERROR;
00132 }
00133
00134 gearman_server_set_queue_context(server, queue);
00135
00136 while (gearman_conf_module_value(module, &name, &value))
00137 {
00138 if (! strcmp(name, "db"))
00139 {
00140 if (sqlite3_open(value, &(queue->db)) != SQLITE_OK)
00141 {
00142 gearman_server_queue_libsqlite3_deinit(server);
00143 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00144 "Can't open database: %s\n",
00145 sqlite3_errmsg(queue->db));
00146 free(queue);
00147 return GEARMAN_QUEUE_ERROR;
00148 }
00149 }
00150 else if (! strcmp(name, "table"))
00151 {
00152 snprintf(queue->table, SQLITE_MAX_TABLE_SIZE, "%s", value);
00153 }
00154 else
00155 {
00156 gearman_server_queue_libsqlite3_deinit(server);
00157 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00158 "Unknown argument: %s", name);
00159 return GEARMAN_QUEUE_ERROR;
00160 }
00161 }
00162
00163 if (! queue->db)
00164 {
00165 gearman_server_queue_libsqlite3_deinit(server);
00166 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00167 "missing required --libsqlite3-db=<dbfile> argument");
00168 return GEARMAN_QUEUE_ERROR;
00169 }
00170
00171 query= "SELECT name FROM sqlite_master WHERE type='table'";
00172 if (_sqlite_query(server, queue, query, strlen(query), &sth) != SQLITE_OK)
00173 {
00174 gearman_server_queue_libsqlite3_deinit(server);
00175 return GEARMAN_QUEUE_ERROR;
00176 }
00177
00178 while (sqlite3_step(sth) == SQLITE_ROW)
00179 {
00180 if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
00181 table= (char*)sqlite3_column_text(sth, 0);
00182 else
00183 {
00184 sqlite3_finalize(sth);
00185 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00186 "column %d is not type TEXT", 0);
00187 return GEARMAN_QUEUE_ERROR;
00188 }
00189
00190 if (! strcasecmp(queue->table, table))
00191 {
00192 gearman_log_info(server->gearman, "sqlite module using table '%s'", table);
00193 break;
00194 }
00195 }
00196
00197 if (sqlite3_finalize(sth) != SQLITE_OK)
00198 {
00199 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00200 "sqlite_finalize:%s", sqlite3_errmsg(queue->db));
00201 gearman_server_queue_libsqlite3_deinit(server);
00202 return GEARMAN_QUEUE_ERROR;
00203 }
00204
00205 if (table == NULL)
00206 {
00207 snprintf(create, SQLITE_MAX_CREATE_TABLE_SIZE,
00208 "CREATE TABLE %s"
00209 "("
00210 "unique_key TEXT PRIMARY KEY,"
00211 "function_name TEXT,"
00212 "priority INTEGER,"
00213 "data BLOB"
00214 ")",
00215 queue->table);
00216
00217 gearman_log_info(server->gearman, "sqlite module creating table '%s'", queue->table);
00218
00219 if (_sqlite_query(server, queue, create, strlen(create), &sth)
00220 != SQLITE_OK)
00221 {
00222 gearman_server_queue_libsqlite3_deinit(server);
00223 return GEARMAN_QUEUE_ERROR;
00224 }
00225
00226 if (sqlite3_step(sth) != SQLITE_DONE)
00227 {
00228 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00229 "create table error: %s",
00230 sqlite3_errmsg(queue->db));
00231 sqlite3_finalize(sth);
00232 return GEARMAN_QUEUE_ERROR;
00233 }
00234
00235 if (sqlite3_finalize(sth) != SQLITE_OK)
00236 {
00237 gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00238 "sqlite_finalize:%s",
00239 sqlite3_errmsg(queue->db));
00240 gearman_server_queue_libsqlite3_deinit(server);
00241 return GEARMAN_QUEUE_ERROR;
00242 }
00243 }
00244
00245 gearman_server_set_queue_add_fn(server, _sqlite_add);
00246 gearman_server_set_queue_flush_fn(server, _sqlite_flush);
00247 gearman_server_set_queue_done_fn(server, _sqlite_done);
00248 gearman_server_set_queue_replay_fn(server, _sqlite_replay);
00249
00250 return GEARMAN_SUCCESS;
00251 }
00252
00253 gearman_return_t
00254 gearman_server_queue_libsqlite3_deinit(gearman_server_st *server)
00255 {
00256 gearman_queue_sqlite_st *queue;
00257
00258 gearman_log_info(server->gearman, "Shutting down sqlite queue module");
00259
00260 queue= (gearman_queue_sqlite_st *)gearman_server_queue_context(server);
00261 gearman_server_set_queue_context(server, NULL);
00262 sqlite3_close(queue->db);
00263 if (queue->query != NULL)
00264 free(queue->query);
00265 free(queue);
00266
00267 return GEARMAN_SUCCESS;
00268 }
00269
00270 gearman_return_t gearmand_queue_libsqlite3_init(gearmand_st *gearmand,
00271 gearman_conf_st *conf)
00272 {
00273 return gearman_server_queue_libsqlite3_init(&(gearmand->server), conf);
00274 }
00275
00276 gearman_return_t gearmand_queue_libsqlite3_deinit(gearmand_st *gearmand)
00277 {
00278 return gearman_server_queue_libsqlite3_deinit(&(gearmand->server));
00279 }
00280
00281
00282
00283
00284
00285 int _sqlite_query(gearman_server_st *server,
00286 gearman_queue_sqlite_st *queue,
00287 const char *query, size_t query_size,
00288 sqlite3_stmt ** sth)
00289 {
00290 int ret;
00291
00292 if (query_size > UINT32_MAX)
00293 {
00294 gearman_log_error(server->gearman, "_sqlite_query", "query size too big [%u]",
00295 (uint32_t)query_size);
00296 return SQLITE_ERROR;
00297 }
00298
00299 gearman_log_crazy(server->gearman, "sqlite query: %s", query);
00300 ret= sqlite3_prepare(queue->db, query, (int)query_size, sth, NULL);
00301 if (ret != SQLITE_OK)
00302 {
00303 if (*sth)
00304 {
00305 sqlite3_finalize(*sth);
00306 }
00307 *sth= NULL;
00308 gearman_log_error(server->gearman, "_sqlite_query", "sqlite_prepare:%s",
00309 sqlite3_errmsg(queue->db));
00310 }
00311
00312 return ret;
00313 }
00314
00315 int _sqlite_lock(gearman_server_st *server,
00316 gearman_queue_sqlite_st *queue)
00317 {
00318 sqlite3_stmt* sth;
00319 int ret;
00320 if (queue->in_trans)
00321 {
00322
00323 return SQLITE_OK;
00324 }
00325
00326 ret= _sqlite_query(server, queue, "BEGIN TRANSACTION",
00327 sizeof("BEGIN TRANSACTION") - 1, &sth);
00328 if (ret != SQLITE_OK)
00329 {
00330 gearman_log_error(server->gearman, "_sqlite_lock",
00331 "failed to begin transaction: %s",
00332 sqlite3_errmsg(queue->db));
00333 if (sth)
00334 {
00335 sqlite3_finalize(sth);
00336 }
00337
00338 return ret;
00339 }
00340
00341 ret= sqlite3_step(sth);
00342 if (ret != SQLITE_DONE)
00343 {
00344 gearman_log_error(server->gearman, "_sqlite_lock", "lock error: %s",
00345 sqlite3_errmsg(queue->db));
00346 sqlite3_finalize(sth);
00347 return ret;
00348 }
00349
00350 sqlite3_finalize(sth);
00351 queue->in_trans++;
00352
00353 return SQLITE_OK;
00354 }
00355
00356 int _sqlite_commit(gearman_server_st *server,
00357 gearman_queue_sqlite_st *queue)
00358 {
00359 sqlite3_stmt* sth;
00360 int ret;
00361
00362 if (! queue->in_trans)
00363 {
00364
00365 return SQLITE_OK;
00366 }
00367
00368 ret= _sqlite_query(server, queue, "COMMIT", sizeof("COMMIT") - 1, &sth);
00369 if (ret != SQLITE_OK)
00370 {
00371 gearman_log_error(server->gearman, "_sqlite_commit",
00372 "failed to commit transaction: %s",
00373 sqlite3_errmsg(queue->db));
00374 if (sth)
00375 {
00376 sqlite3_finalize(sth);
00377 }
00378
00379 return ret;
00380 }
00381 ret= sqlite3_step(sth);
00382 if (ret != SQLITE_DONE)
00383 {
00384 gearman_log_error(server->gearman, "_sqlite_commit", "commit error: %s",
00385 sqlite3_errmsg(queue->db));
00386 sqlite3_finalize(sth);
00387 return ret;
00388 }
00389 sqlite3_finalize(sth);
00390 queue->in_trans= 0;
00391 return SQLITE_OK;
00392 }
00393
00394 int _sqlite_rollback(gearman_server_st *server,
00395 gearman_queue_sqlite_st *queue)
00396 {
00397 sqlite3_stmt* sth;
00398 int ret;
00399 const char* query;
00400
00401 if (! queue->in_trans)
00402 {
00403
00404 return SQLITE_OK;
00405 }
00406
00407 query= "ROLLBACK";
00408 ret= _sqlite_query(server, queue, query, strlen(query), &sth);
00409 if (ret != SQLITE_OK)
00410 {
00411 gearman_log_error(server->gearman, "_sqlite_rollback",
00412 "failed to rollback transaction: %s",
00413 sqlite3_errmsg(queue->db));
00414 if (sth)
00415 {
00416 sqlite3_finalize(sth);
00417 }
00418
00419 return ret;
00420 }
00421 ret= sqlite3_step(sth);
00422 if (ret != SQLITE_DONE)
00423 {
00424 gearman_log_error(server->gearman, "_sqlite_rollback", "rollback error: %s",
00425 sqlite3_errmsg(queue->db));
00426 sqlite3_finalize(sth);
00427 return ret;
00428 }
00429 sqlite3_finalize(sth);
00430 queue->in_trans= 0;
00431
00432 return SQLITE_OK;
00433 }
00434
00435 static gearman_return_t _sqlite_add(gearman_server_st *server, void *context,
00436 const void *unique, size_t unique_size,
00437 const void *function_name,
00438 size_t function_name_size,
00439 const void *data, size_t data_size,
00440 gearman_job_priority_t priority)
00441 {
00442 gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00443 char *query;
00444 size_t query_size;
00445 sqlite3_stmt* sth;
00446
00447 if (unique_size > UINT32_MAX || function_name_size > UINT32_MAX ||
00448 data_size > UINT32_MAX)
00449 {
00450 gearman_log_error(server->gearman, "_sqlite_add", "size too big [%u]",
00451 (uint32_t)unique_size);
00452 return SQLITE_ERROR;
00453 }
00454
00455 gearman_log_debug(server->gearman, "sqlite add: %.*s", (uint32_t)unique_size, (char *)unique);
00456
00457 if (_sqlite_lock(server, queue) != SQLITE_OK)
00458 return GEARMAN_QUEUE_ERROR;
00459
00460 query_size= ((unique_size + function_name_size + data_size) * 2) +
00461 GEARMAN_QUEUE_QUERY_BUFFER;
00462 if (query_size > queue->query_size)
00463 {
00464 query= realloc(queue->query, query_size);
00465 if (query == NULL)
00466 {
00467 gearman_log_error(server->gearman, "_sqlite_add", "realloc");
00468 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00469 }
00470
00471 queue->query= query;
00472 queue->query_size= query_size;
00473 }
00474 else
00475 {
00476 query= queue->query;
00477 }
00478
00479 query_size= (size_t)snprintf(query, query_size,
00480 "INSERT OR REPLACE INTO %s (priority,unique_key,"
00481 "function_name,data) VALUES (?,?,?,?)",
00482 queue->table);
00483
00484 if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00485 return GEARMAN_QUEUE_ERROR;
00486
00487 if (sqlite3_bind_int(sth, 1, priority) != SQLITE_OK)
00488 {
00489 _sqlite_rollback(server, queue);
00490 gearman_log_error(server->gearman, "_sqlite_add", "failed to bind int [%d]: %s", priority, sqlite3_errmsg(queue->db));
00491 sqlite3_finalize(sth);
00492
00493 return GEARMAN_QUEUE_ERROR;
00494 }
00495
00496 if (sqlite3_bind_text(sth, 2, unique, (int)unique_size,
00497 SQLITE_TRANSIENT) != SQLITE_OK)
00498 {
00499 _sqlite_rollback(server, queue);
00500 gearman_log_error(server->gearman, "_sqlite_add",
00501 "failed to bind text [%.*s]: %s",
00502 (uint32_t)unique_size, (char*)unique,
00503 sqlite3_errmsg(queue->db));
00504 sqlite3_finalize(sth);
00505 return GEARMAN_QUEUE_ERROR;
00506 }
00507
00508 if (sqlite3_bind_text(sth, 3, function_name, (int)function_name_size,
00509 SQLITE_TRANSIENT) != SQLITE_OK)
00510 {
00511 _sqlite_rollback(server, queue);
00512 gearman_log_error(server->gearman, "_sqlite_add",
00513 "failed to bind text [%.*s]: %s",
00514 (uint32_t)function_name_size, (char*)function_name,
00515 sqlite3_errmsg(queue->db));
00516 sqlite3_finalize(sth);
00517 return GEARMAN_QUEUE_ERROR;
00518 }
00519
00520 if (sqlite3_bind_blob(sth, 4, data, (int)data_size,
00521 SQLITE_TRANSIENT) != SQLITE_OK)
00522 {
00523 _sqlite_rollback(server, queue);
00524 gearman_log_error(server->gearman, "_sqlite_add", "failed to bind blob: %s",
00525 sqlite3_errmsg(queue->db));
00526 sqlite3_finalize(sth);
00527 return GEARMAN_QUEUE_ERROR;
00528 }
00529
00530 if (sqlite3_step(sth) != SQLITE_DONE)
00531 {
00532 gearman_log_error(server->gearman, "_sqlite_add", "insert error: %s",
00533 sqlite3_errmsg(queue->db));
00534 if (sqlite3_finalize(sth) != SQLITE_OK )
00535 {
00536 gearman_log_error(server->gearman, "_sqlite_add", "finalize error: %s",
00537 sqlite3_errmsg(queue->db));
00538 }
00539
00540 return GEARMAN_QUEUE_ERROR;
00541 }
00542
00543 gearman_log_crazy(server->gearman,
00544 "sqlite data: priority: %d, unique_key: %s, function_name: %s",
00545 priority, (char*)unique, (char*)function_name);
00546
00547 sqlite3_finalize(sth);
00548
00549 if (_sqlite_commit(server, queue) != SQLITE_OK)
00550 return GEARMAN_QUEUE_ERROR;
00551
00552 return GEARMAN_SUCCESS;
00553 }
00554
00555 static gearman_return_t _sqlite_flush(gearman_server_st *server,
00556 void *context __attribute__((unused)))
00557 {
00558 gearman_log_debug(server->gearman, "sqlite flush");
00559
00560 return GEARMAN_SUCCESS;
00561 }
00562
00563 static gearman_return_t _sqlite_done(gearman_server_st *server, void *context,
00564 const void *unique,
00565 size_t unique_size,
00566 const void *function_name __attribute__((unused)),
00567 size_t function_name_size __attribute__((unused)))
00568 {
00569 gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00570 char *query;
00571 size_t query_size;
00572 sqlite3_stmt* sth;
00573
00574 if (unique_size > UINT32_MAX)
00575 {
00576 gearman_log_error(server->gearman,
00577 "_sqlite_query", "unique key size too big [%u]",
00578 (uint32_t)unique_size);
00579 return SQLITE_ERROR;
00580 }
00581
00582 gearman_log_debug(server->gearman, "sqlite done: %.*s", (uint32_t)unique_size, (char *)unique);
00583
00584 if (_sqlite_lock(server, queue) != SQLITE_OK)
00585 return GEARMAN_QUEUE_ERROR;
00586
00587 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00588 if (query_size > queue->query_size)
00589 {
00590 query= realloc(queue->query, query_size);
00591 if (query == NULL)
00592 {
00593 gearman_log_error(server->gearman, "_sqlite_add", "realloc");
00594 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00595 }
00596
00597 queue->query= query;
00598 queue->query_size= query_size;
00599 }
00600 else
00601 query= queue->query;
00602
00603 query_size= (size_t)snprintf(query, query_size,
00604 "DELETE FROM %s WHERE unique_key=?",
00605 queue->table);
00606
00607 if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00608 return GEARMAN_QUEUE_ERROR;
00609
00610 sqlite3_bind_text(sth, 1, unique, (int)unique_size, SQLITE_TRANSIENT);
00611
00612 if (sqlite3_step(sth) != SQLITE_DONE)
00613 {
00614 gearman_log_error(server->gearman, "_sqlite_done", "delete error: %s",
00615 sqlite3_errmsg(queue->db));
00616 sqlite3_finalize(sth);
00617 return GEARMAN_QUEUE_ERROR;
00618 }
00619
00620 sqlite3_finalize(sth);
00621
00622 if (_sqlite_commit(server, queue) != SQLITE_OK)
00623 return GEARMAN_QUEUE_ERROR;
00624
00625 return GEARMAN_SUCCESS;
00626 }
00627
00628 static gearman_return_t _sqlite_replay(gearman_server_st *server, void *context,
00629 gearman_queue_add_fn *add_fn,
00630 void *add_context)
00631 {
00632 gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00633 char *query;
00634 size_t query_size;
00635 sqlite3_stmt* sth;
00636 gearman_return_t gret;
00637
00638 gearman_log_info(server->gearman, "sqlite replay start");
00639
00640 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00641 {
00642 query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00643 if (query == NULL)
00644 {
00645 gearman_log_error(server->gearman, "_sqlite_replay", "realloc");
00646 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00647 }
00648
00649 queue->query= query;
00650 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00651 }
00652 else
00653 query= queue->query;
00654
00655 query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00656 "SELECT unique_key,function_name,priority,data "
00657 "FROM %s",
00658 queue->table);
00659
00660 if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00661 return GEARMAN_QUEUE_ERROR;
00662 while (sqlite3_step(sth) == SQLITE_ROW)
00663 {
00664 const void *unique, *function_name;
00665 void *data;
00666 size_t unique_size, function_name_size, data_size;
00667 gearman_job_priority_t priority;
00668
00669 if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
00670 {
00671 unique= sqlite3_column_text(sth,0);
00672 unique_size= (size_t) sqlite3_column_bytes(sth,0);
00673 }
00674 else
00675 {
00676 sqlite3_finalize(sth);
00677 gearman_log_error(server->gearman, "_sqlite_replay",
00678 "column %d is not type TEXT", 0);
00679 return GEARMAN_QUEUE_ERROR;
00680 }
00681
00682 if (sqlite3_column_type(sth,1) == SQLITE_TEXT)
00683 {
00684 function_name= sqlite3_column_text(sth,1);
00685 function_name_size= (size_t)sqlite3_column_bytes(sth,1);
00686 }
00687 else
00688 {
00689 sqlite3_finalize(sth);
00690 gearman_log_error(server->gearman, "_sqlite_replay",
00691 "column %d is not type TEXT", 1);
00692 return GEARMAN_QUEUE_ERROR;
00693 }
00694
00695 if (sqlite3_column_type(sth,2) == SQLITE_INTEGER)
00696 {
00697 priority= (double)sqlite3_column_int64(sth,2);
00698 }
00699 else
00700 {
00701 sqlite3_finalize(sth);
00702 gearman_log_error(server->gearman, "_sqlite_replay",
00703 "column %d is not type INTEGER", 2);
00704 return GEARMAN_QUEUE_ERROR;
00705 }
00706
00707 if (sqlite3_column_type(sth,3) == SQLITE_BLOB)
00708 {
00709 data_size= (size_t)sqlite3_column_bytes(sth,3);
00710
00711 data= malloc(data_size);
00712 if (data == NULL)
00713 {
00714 sqlite3_finalize(sth);
00715 gearman_log_error(server->gearman, "_sqlite_replay", "malloc");
00716 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00717 }
00718 memcpy(data, sqlite3_column_blob(sth,3), data_size);
00719 }
00720 else
00721 {
00722 sqlite3_finalize(sth);
00723 gearman_log_error(server->gearman, "_sqlite_replay", "column %d is not type TEXT", 3);
00724 return GEARMAN_QUEUE_ERROR;
00725 }
00726
00727 gearman_log_debug(server->gearman, "sqlite replay: %s", (char*)function_name);
00728
00729 gret= (*add_fn)(server, add_context,
00730 unique, unique_size,
00731 function_name, function_name_size,
00732 data, data_size,
00733 priority);
00734 if (gret != GEARMAN_SUCCESS)
00735 {
00736 sqlite3_finalize(sth);
00737 return gret;
00738 }
00739 }
00740
00741 sqlite3_finalize(sth);
00742
00743 return GEARMAN_SUCCESS;
00744 }