00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 static void gearman_connection_reset_addrinfo(gearman_connection_st *connection);
00017
00018
00026 gearman_connection_st *gearman_connection_create(gearman_universal_st *gearman,
00027 gearman_connection_st *connection,
00028 gearman_connection_options_t *options)
00029 {
00030 if (connection == NULL)
00031 {
00032 connection= malloc(sizeof(gearman_connection_st));
00033 if (connection == NULL)
00034 {
00035 gearman_universal_set_error(gearman, "gearman_connection_create", "malloc");
00036 return NULL;
00037 }
00038
00039 connection->options.allocated= true;
00040 }
00041 else
00042 {
00043 connection->options.allocated= false;
00044 }
00045
00046 connection->options.ready= false;
00047 connection->options.packet_in_use= false;
00048 connection->options.external_fd= false;
00049 connection->options.ignore_lost_connection= false;
00050 connection->options.close_after_flush= false;
00051
00052 if (options)
00053 {
00054 while (*options != GEARMAN_CON_MAX)
00055 {
00056 gearman_connection_set_option(connection, *options, true);
00057 options++;
00058 }
00059 }
00060
00061
00062 connection->state= 0;
00063 connection->send_state= 0;
00064 connection->recv_state= 0;
00065 connection->port= 0;
00066 connection->events= 0;
00067 connection->revents= 0;
00068 connection->fd= -1;
00069 connection->created_id= 0;
00070 connection->created_id_next= 0;
00071 connection->send_buffer_size= 0;
00072 connection->send_data_size= 0;
00073 connection->send_data_offset= 0;
00074 connection->recv_buffer_size= 0;
00075 connection->recv_data_size= 0;
00076 connection->recv_data_offset= 0;
00077 connection->universal= gearman;
00078
00079 if (gearman->con_list != NULL)
00080 gearman->con_list->prev= connection;
00081 connection->next= gearman->con_list;
00082 connection->prev= NULL;
00083 gearman->con_list= connection;
00084 gearman->con_count++;
00085
00086 connection->context= NULL;
00087 connection->addrinfo= NULL;
00088 connection->addrinfo_next= NULL;
00089 connection->send_buffer_ptr= connection->send_buffer;
00090 connection->recv_packet= NULL;
00091 connection->recv_buffer_ptr= connection->recv_buffer;
00092 connection->protocol_context= NULL;
00093 connection->protocol_context_free_fn= NULL;
00094 connection->packet_pack_fn= gearman_packet_pack;
00095 connection->packet_unpack_fn= gearman_packet_unpack;
00096 connection->host[0]= 0;
00097
00098 return connection;
00099 }
00100
00101 gearman_connection_st *gearman_connection_create_args(gearman_universal_st *gearman, gearman_connection_st *connection,
00102 const char *host, in_port_t port)
00103 {
00104 connection= gearman_connection_create(gearman, connection, NULL);
00105 if (connection == NULL)
00106 return NULL;
00107
00108 gearman_connection_set_host(connection, host, port);
00109
00110 return connection;
00111 }
00112
00113 gearman_connection_st *gearman_connection_clone(gearman_universal_st *gearman, gearman_connection_st *connection,
00114 const gearman_connection_st *from)
00115 {
00116 connection= gearman_connection_create(gearman, connection, NULL);
00117
00118 if (from == NULL || connection == NULL)
00119 return connection;
00120
00121 connection->options.ready= from->options.ready;
00122 connection->options.packet_in_use= from->options.packet_in_use;
00123 connection->options.external_fd= from->options.external_fd;
00124 connection->options.ignore_lost_connection= from->options.ignore_lost_connection;
00125 connection->options.close_after_flush= from->options.close_after_flush;
00126
00127 strcpy(connection->host, from->host);
00128 connection->port= from->port;
00129
00130 return connection;
00131 }
00132
00133 void gearman_connection_free(gearman_connection_st *connection)
00134 {
00135 if (connection->fd != -1)
00136 gearman_connection_close(connection);
00137
00138 gearman_connection_reset_addrinfo(connection);
00139
00140 if (connection->protocol_context != NULL && connection->protocol_context_free_fn != NULL)
00141 connection->protocol_context_free_fn(connection, (void *)connection->protocol_context);
00142
00143 if (connection->universal->con_list == connection)
00144 connection->universal->con_list= connection->next;
00145 if (connection->prev != NULL)
00146 connection->prev->next= connection->next;
00147 if (connection->next != NULL)
00148 connection->next->prev= connection->prev;
00149 connection->universal->con_count--;
00150
00151 if (connection->options.packet_in_use)
00152 gearman_packet_free(&(connection->packet));
00153
00154 if (connection->options.allocated)
00155 free(connection);
00156 }
00157
00158 gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
00159 gearman_connection_options_t options,
00160 bool value)
00161 {
00162 switch (options)
00163 {
00164 case GEARMAN_CON_READY:
00165 connection->options.ready= value;
00166 break;
00167 case GEARMAN_CON_PACKET_IN_USE:
00168 connection->options.packet_in_use= value;
00169 break;
00170 case GEARMAN_CON_EXTERNAL_FD:
00171 connection->options.external_fd= value;
00172 break;
00173 case GEARMAN_CON_IGNORE_LOST_CONNECTION:
00174 connection->options.ignore_lost_connection= value;
00175 break;
00176 case GEARMAN_CON_CLOSE_AFTER_FLUSH:
00177 connection->options.close_after_flush= value;
00178 break;
00179 case GEARMAN_CON_MAX:
00180 default:
00181 return GEARMAN_INVALID_COMMAND;
00182 }
00183
00184 return GEARMAN_SUCCESS;
00185 }
00186
00190 static gearman_return_t _con_setsockopt(gearman_connection_st *connection);
00191
00194
00195
00196
00197
00198 void gearman_connection_set_host(gearman_connection_st *connection,
00199 const char *host,
00200 in_port_t port)
00201 {
00202 gearman_connection_reset_addrinfo(connection);
00203
00204 strncpy(connection->host, host == NULL ? GEARMAN_DEFAULT_TCP_HOST : host,
00205 NI_MAXHOST);
00206 connection->host[NI_MAXHOST - 1]= 0;
00207
00208 connection->port= (in_port_t)(port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port);
00209 }
00210
00211 gearman_return_t gearman_connection_set_fd(gearman_connection_st *connection, int fd)
00212 {
00213 gearman_return_t ret;
00214
00215 connection->options.external_fd= true;
00216 connection->fd= fd;
00217 connection->state= GEARMAN_CON_UNIVERSAL_CONNECTED;
00218
00219 ret= _con_setsockopt(connection);
00220 if (ret != GEARMAN_SUCCESS)
00221 {
00222 connection->universal->last_errno= errno;
00223 return ret;
00224 }
00225
00226 return GEARMAN_SUCCESS;
00227 }
00228
00229 void *gearman_connection_context(const gearman_connection_st *connection)
00230 {
00231 return connection->context;
00232 }
00233
00234 void gearman_connection_set_context(gearman_connection_st *connection, void *context)
00235 {
00236 connection->context= context;
00237 }
00238
00239 gearman_return_t gearman_connection_connect(gearman_connection_st *connection)
00240 {
00241 return gearman_connection_flush(connection);
00242 }
00243
00244 void gearman_connection_close(gearman_connection_st *connection)
00245 {
00246 if (connection->fd == -1)
00247 return;
00248
00249 if (connection->options.external_fd)
00250 connection->options.external_fd= false;
00251 else
00252 (void)close(connection->fd);
00253
00254 connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
00255 connection->fd= -1;
00256 connection->events= 0;
00257 connection->revents= 0;
00258
00259 connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
00260 connection->send_buffer_ptr= connection->send_buffer;
00261 connection->send_buffer_size= 0;
00262 connection->send_data_size= 0;
00263 connection->send_data_offset= 0;
00264
00265 connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00266 if (connection->recv_packet != NULL)
00267 {
00268 gearman_packet_free(connection->recv_packet);
00269 connection->recv_packet= NULL;
00270 }
00271
00272 connection->recv_buffer_ptr= connection->recv_buffer;
00273 connection->recv_buffer_size= 0;
00274 }
00275
00276 void gearman_connection_reset_addrinfo(gearman_connection_st *connection)
00277 {
00278 if (connection->addrinfo != NULL)
00279 {
00280 freeaddrinfo(connection->addrinfo);
00281 connection->addrinfo= NULL;
00282 }
00283
00284 connection->addrinfo_next= NULL;
00285 }
00286
00287 gearman_return_t gearman_connection_send(gearman_connection_st *connection,
00288 const gearman_packet_st *packet, bool flush)
00289 {
00290 gearman_return_t ret;
00291 size_t send_size;
00292
00293 switch (connection->send_state)
00294 {
00295 case GEARMAN_CON_SEND_STATE_NONE:
00296 if (! (packet->options.complete))
00297 {
00298 gearman_universal_set_error(connection->universal, "gearman_connection_send",
00299 "packet not complete");
00300 return GEARMAN_INVALID_PACKET;
00301 }
00302
00303
00304 while (1)
00305 {
00306 send_size= connection->packet_pack_fn(packet, connection,
00307 connection->send_buffer + connection->send_buffer_size,
00308 GEARMAN_SEND_BUFFER_SIZE -
00309 connection->send_buffer_size, &ret);
00310 if (ret == GEARMAN_SUCCESS)
00311 {
00312 connection->send_buffer_size+= send_size;
00313 break;
00314 }
00315 else if (ret == GEARMAN_IGNORE_PACKET)
00316 return GEARMAN_SUCCESS;
00317 else if (ret != GEARMAN_FLUSH_DATA)
00318 return ret;
00319
00320
00321 if (connection->send_buffer_size == 0)
00322 {
00323 gearman_universal_set_error(connection->universal, "gearman_connection_send",
00324 "send buffer too small (%u)",
00325 GEARMAN_SEND_BUFFER_SIZE);
00326 return GEARMAN_SEND_BUFFER_TOO_SMALL;
00327 }
00328
00329
00330 connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH;
00331
00332 case GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH:
00333 ret= gearman_connection_flush(connection);
00334 if (ret != GEARMAN_SUCCESS)
00335 return ret;
00336 }
00337
00338
00339 if (packet->data_size == 0)
00340 break;
00341
00342
00343 if (packet->data != NULL &&
00344 (GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
00345 {
00346 connection->send_data_offset= GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size;
00347 if (connection->send_data_offset > packet->data_size)
00348 connection->send_data_offset= packet->data_size;
00349
00350 memcpy(connection->send_buffer + connection->send_buffer_size, packet->data,
00351 connection->send_data_offset);
00352 connection->send_buffer_size+= connection->send_data_offset;
00353
00354
00355 if (connection->send_data_offset == packet->data_size)
00356 {
00357 connection->send_data_offset= 0;
00358 break;
00359 }
00360 }
00361
00362
00363 connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH;
00364
00365 case GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH:
00366 ret= gearman_connection_flush(connection);
00367 if (ret != GEARMAN_SUCCESS)
00368 return ret;
00369
00370 connection->send_data_size= packet->data_size;
00371
00372
00373 if (packet->data == NULL)
00374 {
00375 connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
00376 return GEARMAN_SUCCESS;
00377 }
00378
00379
00380 connection->send_buffer_size= packet->data_size - connection->send_data_offset;
00381 if (connection->send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
00382 {
00383 memcpy(connection->send_buffer,
00384 (char *)packet->data + connection->send_data_offset,
00385 connection->send_buffer_size);
00386 connection->send_data_size= 0;
00387 connection->send_data_offset= 0;
00388 break;
00389 }
00390
00391 connection->send_buffer_ptr= (char *)packet->data + connection->send_data_offset;
00392 connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
00393
00394 case GEARMAN_CON_SEND_UNIVERSAL_FLUSH:
00395 case GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA:
00396 ret= gearman_connection_flush(connection);
00397 if (ret == GEARMAN_SUCCESS && connection->options.close_after_flush)
00398 {
00399 gearman_connection_close(connection);
00400 ret= GEARMAN_LOST_CONNECTION;
00401 }
00402 return ret;
00403
00404 default:
00405 gearman_universal_set_error(connection->universal, "gearman_connection_send", "unknown state: %u",
00406 connection->send_state);
00407 return GEARMAN_UNKNOWN_STATE;
00408 }
00409
00410 if (flush)
00411 {
00412 connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH;
00413 ret= gearman_connection_flush(connection);
00414 if (ret == GEARMAN_SUCCESS && connection->options.close_after_flush)
00415 {
00416 gearman_connection_close(connection);
00417 ret= GEARMAN_LOST_CONNECTION;
00418 }
00419 return ret;
00420 }
00421
00422 connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
00423 return GEARMAN_SUCCESS;
00424 }
00425
00426 size_t gearman_connection_send_data(gearman_connection_st *connection, const void *data,
00427 size_t data_size, gearman_return_t *ret_ptr)
00428 {
00429 if (connection->send_state != GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
00430 {
00431 gearman_universal_set_error(connection->universal, "gearman_connection_send_data", "not flushing");
00432 return GEARMAN_NOT_FLUSHING;
00433 }
00434
00435 if (data_size > (connection->send_data_size - connection->send_data_offset))
00436 {
00437 gearman_universal_set_error(connection->universal, "gearman_connection_send_data", "data too large");
00438 return GEARMAN_DATA_TOO_LARGE;
00439 }
00440
00441 connection->send_buffer_ptr= (char *)data;
00442 connection->send_buffer_size= data_size;
00443
00444 *ret_ptr= gearman_connection_flush(connection);
00445
00446 return data_size - connection->send_buffer_size;
00447 }
00448
00449 gearman_return_t gearman_connection_flush(gearman_connection_st *connection)
00450 {
00451 char port_str[NI_MAXSERV];
00452 struct addrinfo ai;
00453 int ret;
00454 ssize_t write_size;
00455 gearman_return_t gret;
00456
00457 while (1)
00458 {
00459 switch (connection->state)
00460 {
00461 case GEARMAN_CON_UNIVERSAL_ADDRINFO:
00462 if (connection->addrinfo != NULL)
00463 {
00464 freeaddrinfo(connection->addrinfo);
00465 connection->addrinfo= NULL;
00466 }
00467
00468 snprintf(port_str, NI_MAXSERV, "%hu", (uint16_t)connection->port);
00469
00470 memset(&ai, 0, sizeof(struct addrinfo));
00471 ai.ai_socktype= SOCK_STREAM;
00472 ai.ai_protocol= IPPROTO_TCP;
00473
00474 ret= getaddrinfo(connection->host, port_str, &ai, &(connection->addrinfo));
00475 if (ret != 0)
00476 {
00477 gearman_universal_set_error(connection->universal, "gearman_connection_flush", "getaddrinfo:%s",
00478 gai_strerror(ret));
00479 return GEARMAN_GETADDRINFO;
00480 }
00481
00482 connection->addrinfo_next= connection->addrinfo;
00483
00484 case GEARMAN_CON_UNIVERSAL_CONNECT:
00485 if (connection->fd != -1)
00486 gearman_connection_close(connection);
00487
00488 if (connection->addrinfo_next == NULL)
00489 {
00490 connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
00491 gearman_universal_set_error(connection->universal, "gearman_connection_flush",
00492 "could not connect");
00493 return GEARMAN_COULD_NOT_CONNECT;
00494 }
00495
00496 connection->fd= socket(connection->addrinfo_next->ai_family,
00497 connection->addrinfo_next->ai_socktype,
00498 connection->addrinfo_next->ai_protocol);
00499 if (connection->fd == -1)
00500 {
00501 connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
00502 gearman_universal_set_error(connection->universal, "gearman_connection_flush", "socket:%d",
00503 errno);
00504 connection->universal->last_errno= errno;
00505 return GEARMAN_ERRNO;
00506 }
00507
00508 gret= _con_setsockopt(connection);
00509 if (gret != GEARMAN_SUCCESS)
00510 {
00511 connection->universal->last_errno= errno;
00512 gearman_connection_close(connection);
00513 return gret;
00514 }
00515
00516 while (1)
00517 {
00518 ret= connect(connection->fd, connection->addrinfo_next->ai_addr,
00519 connection->addrinfo_next->ai_addrlen);
00520 if (ret == 0)
00521 {
00522 connection->state= GEARMAN_CON_UNIVERSAL_CONNECTED;
00523 connection->addrinfo_next= NULL;
00524 break;
00525 }
00526
00527 if (errno == EAGAIN || errno == EINTR)
00528 continue;
00529
00530 if (errno == EINPROGRESS)
00531 {
00532 connection->state= GEARMAN_CON_UNIVERSAL_CONNECTING;
00533 break;
00534 }
00535
00536 if (errno == ECONNREFUSED || errno == ENETUNREACH || errno == ETIMEDOUT)
00537 {
00538 connection->state= GEARMAN_CON_UNIVERSAL_CONNECT;
00539 connection->addrinfo_next= connection->addrinfo_next->ai_next;
00540 break;
00541 }
00542
00543 gearman_universal_set_error(connection->universal, "gearman_connection_flush", "connect:%d",
00544 errno);
00545 connection->universal->last_errno= errno;
00546 gearman_connection_close(connection);
00547 return GEARMAN_ERRNO;
00548 }
00549
00550 if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTING)
00551 break;
00552
00553 case GEARMAN_CON_UNIVERSAL_CONNECTING:
00554 while (1)
00555 {
00556 if (connection->revents & POLLOUT)
00557 {
00558 connection->state= GEARMAN_CON_UNIVERSAL_CONNECTED;
00559 break;
00560 }
00561 else if (connection->revents & (POLLERR | POLLHUP | POLLNVAL))
00562 {
00563 connection->state= GEARMAN_CON_UNIVERSAL_CONNECT;
00564 connection->addrinfo_next= connection->addrinfo_next->ai_next;
00565 break;
00566 }
00567
00568 gret= gearman_connection_set_events(connection, POLLOUT);
00569 if (gret != GEARMAN_SUCCESS)
00570 return gret;
00571
00572 if (gearman_universal_is_non_blocking(connection->universal))
00573 {
00574 connection->state= GEARMAN_CON_UNIVERSAL_CONNECTING;
00575 return GEARMAN_IO_WAIT;
00576 }
00577
00578 gret= gearman_wait(connection->universal);
00579 if (gret != GEARMAN_SUCCESS)
00580 return gret;
00581 }
00582
00583 if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTED)
00584 break;
00585
00586 case GEARMAN_CON_UNIVERSAL_CONNECTED:
00587 while (connection->send_buffer_size != 0)
00588 {
00589 write_size= write(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size);
00590 if (write_size == 0)
00591 {
00592 if (! (connection->options.ignore_lost_connection))
00593 {
00594 gearman_universal_set_error(connection->universal, "gearman_connection_flush",
00595 "lost connection to server (EOF)");
00596 }
00597 gearman_connection_close(connection);
00598 return GEARMAN_LOST_CONNECTION;
00599 }
00600 else if (write_size == -1)
00601 {
00602 if (errno == EAGAIN)
00603 {
00604 gret= gearman_connection_set_events(connection, POLLOUT);
00605 if (gret != GEARMAN_SUCCESS)
00606 return gret;
00607
00608 if (gearman_universal_is_non_blocking(connection->universal))
00609 return GEARMAN_IO_WAIT;
00610
00611 gret= gearman_wait(connection->universal);
00612 if (gret != GEARMAN_SUCCESS)
00613 return gret;
00614
00615 continue;
00616 }
00617 else if (errno == EINTR)
00618 continue;
00619 else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
00620 {
00621 if (! (connection->options.ignore_lost_connection))
00622 {
00623 gearman_universal_set_error(connection->universal, "gearman_connection_flush",
00624 "lost connection to server (%d)", errno);
00625 }
00626 gearman_connection_close(connection);
00627 return GEARMAN_LOST_CONNECTION;
00628 }
00629
00630 gearman_universal_set_error(connection->universal, "gearman_connection_flush", "write:%d",
00631 errno);
00632 connection->universal->last_errno= errno;
00633 gearman_connection_close(connection);
00634 return GEARMAN_ERRNO;
00635 }
00636
00637 connection->send_buffer_size-= (size_t)write_size;
00638 if (connection->send_state == GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
00639 {
00640 connection->send_data_offset+= (size_t)write_size;
00641 if (connection->send_data_offset == connection->send_data_size)
00642 {
00643 connection->send_data_size= 0;
00644 connection->send_data_offset= 0;
00645 break;
00646 }
00647
00648 if (connection->send_buffer_size == 0)
00649 return GEARMAN_SUCCESS;
00650 }
00651 else if (connection->send_buffer_size == 0)
00652 break;
00653
00654 connection->send_buffer_ptr+= write_size;
00655 }
00656
00657 connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
00658 connection->send_buffer_ptr= connection->send_buffer;
00659 return GEARMAN_SUCCESS;
00660
00661 default:
00662 gearman_universal_set_error(connection->universal, "gearman_connection_flush", "unknown state: %u",
00663 connection->state);
00664
00665 return GEARMAN_UNKNOWN_STATE;
00666 }
00667 }
00668 }
00669
00670 gearman_packet_st *gearman_connection_recv(gearman_connection_st *connection,
00671 gearman_packet_st *packet,
00672 gearman_return_t *ret_ptr, bool recv_data)
00673 {
00674 size_t recv_size;
00675
00676 switch (connection->recv_state)
00677 {
00678 case GEARMAN_CON_RECV_UNIVERSAL_NONE:
00679 if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTED)
00680 {
00681 gearman_universal_set_error(connection->universal, "gearman_connection_recv", "not connected");
00682 *ret_ptr= GEARMAN_NOT_CONNECTED;
00683 return NULL;
00684 }
00685
00686 connection->recv_packet= gearman_packet_create(connection->universal, packet);
00687 if (connection->recv_packet == NULL)
00688 {
00689 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00690 return NULL;
00691 }
00692
00693 connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_READ;
00694
00695 case GEARMAN_CON_RECV_UNIVERSAL_READ:
00696 while (1)
00697 {
00698 if (connection->recv_buffer_size > 0)
00699 {
00700 recv_size= connection->packet_unpack_fn(connection->recv_packet, connection,
00701 connection->recv_buffer_ptr,
00702 connection->recv_buffer_size, ret_ptr);
00703 connection->recv_buffer_ptr+= recv_size;
00704 connection->recv_buffer_size-= recv_size;
00705 if (*ret_ptr == GEARMAN_SUCCESS)
00706 break;
00707 else if (*ret_ptr != GEARMAN_IO_WAIT)
00708 {
00709 gearman_connection_close(connection);
00710 return NULL;
00711 }
00712 }
00713
00714
00715 if (connection->recv_buffer_size > 0)
00716 memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
00717 connection->recv_buffer_ptr= connection->recv_buffer;
00718
00719 recv_size= gearman_connection_read(connection, connection->recv_buffer + connection->recv_buffer_size,
00720 GEARMAN_RECV_BUFFER_SIZE - connection->recv_buffer_size,
00721 ret_ptr);
00722 if (*ret_ptr != GEARMAN_SUCCESS)
00723 return NULL;
00724
00725 connection->recv_buffer_size+= recv_size;
00726 }
00727
00728 if (packet->data_size == 0)
00729 {
00730 connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00731 break;
00732 }
00733
00734 connection->recv_data_size= packet->data_size;
00735
00736 if (!recv_data)
00737 {
00738 connection->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
00739 break;
00740 }
00741
00742 if (packet->universal->workload_malloc_fn == NULL)
00743 {
00744 packet->data= malloc(packet->data_size);
00745 }
00746 else
00747 {
00748 packet->data= packet->universal->workload_malloc_fn(packet->data_size,
00749 (void *)packet->universal->workload_malloc_context);
00750 }
00751 if (packet->data == NULL)
00752 {
00753 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00754 gearman_connection_close(connection);
00755 return NULL;
00756 }
00757
00758 packet->options.free_data= true;
00759 connection->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
00760
00761 case GEARMAN_CON_RECV_STATE_READ_DATA:
00762 while (connection->recv_data_size != 0)
00763 {
00764 (void)gearman_connection_recv_data(connection,
00765 ((uint8_t *)(packet->data)) +
00766 connection->recv_data_offset,
00767 packet->data_size -
00768 connection->recv_data_offset, ret_ptr);
00769 if (*ret_ptr != GEARMAN_SUCCESS)
00770 return NULL;
00771 }
00772
00773 connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00774 break;
00775
00776 default:
00777 gearman_universal_set_error(connection->universal, "gearman_connection_recv", "unknown state: %u",
00778 connection->recv_state);
00779 *ret_ptr= GEARMAN_UNKNOWN_STATE;
00780 return NULL;
00781 }
00782
00783 packet= connection->recv_packet;
00784 connection->recv_packet= NULL;
00785
00786 return packet;
00787 }
00788
00789 size_t gearman_connection_recv_data(gearman_connection_st *connection, void *data, size_t data_size,
00790 gearman_return_t *ret_ptr)
00791 {
00792 size_t recv_size= 0;
00793
00794 if (connection->recv_data_size == 0)
00795 {
00796 *ret_ptr= GEARMAN_SUCCESS;
00797 return 0;
00798 }
00799
00800 if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
00801 data_size= connection->recv_data_size - connection->recv_data_offset;
00802
00803 if (connection->recv_buffer_size > 0)
00804 {
00805 if (connection->recv_buffer_size < data_size)
00806 recv_size= connection->recv_buffer_size;
00807 else
00808 recv_size= data_size;
00809
00810 memcpy(data, connection->recv_buffer_ptr, recv_size);
00811 connection->recv_buffer_ptr+= recv_size;
00812 connection->recv_buffer_size-= recv_size;
00813 }
00814
00815 if (data_size != recv_size)
00816 {
00817 recv_size+= gearman_connection_read(connection, ((uint8_t *)data) + recv_size,
00818 data_size - recv_size, ret_ptr);
00819 connection->recv_data_offset+= recv_size;
00820 }
00821 else
00822 {
00823 connection->recv_data_offset+= recv_size;
00824 *ret_ptr= GEARMAN_SUCCESS;
00825 }
00826
00827 if (connection->recv_data_size == connection->recv_data_offset)
00828 {
00829 connection->recv_data_size= 0;
00830 connection->recv_data_offset= 0;
00831 connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00832 }
00833
00834 return recv_size;
00835 }
00836
00837 size_t gearman_connection_read(gearman_connection_st *connection, void *data, size_t data_size,
00838 gearman_return_t *ret_ptr)
00839 {
00840 ssize_t read_size;
00841
00842 while (1)
00843 {
00844 read_size= read(connection->fd, data, data_size);
00845 if (read_size == 0)
00846 {
00847 if (! (connection->options.ignore_lost_connection))
00848 {
00849 gearman_universal_set_error(connection->universal, "gearman_connection_read",
00850 "lost connection to server (EOF)");
00851 }
00852 gearman_connection_close(connection);
00853 *ret_ptr= GEARMAN_LOST_CONNECTION;
00854 return 0;
00855 }
00856 else if (read_size == -1)
00857 {
00858 if (errno == EAGAIN)
00859 {
00860 *ret_ptr= gearman_connection_set_events(connection, POLLIN);
00861 if (*ret_ptr != GEARMAN_SUCCESS)
00862 return 0;
00863
00864 if (gearman_universal_is_non_blocking(connection->universal))
00865 {
00866 *ret_ptr= GEARMAN_IO_WAIT;
00867 return 0;
00868 }
00869
00870 *ret_ptr= gearman_wait(connection->universal);
00871 if (*ret_ptr != GEARMAN_SUCCESS)
00872 return 0;
00873
00874 continue;
00875 }
00876 else if (errno == EINTR)
00877 continue;
00878 else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
00879 {
00880 if (! (connection->options.ignore_lost_connection))
00881 {
00882 gearman_universal_set_error(connection->universal, "gearman_connection_read",
00883 "lost connection to server (%d)", errno);
00884 }
00885 *ret_ptr= GEARMAN_LOST_CONNECTION;
00886 }
00887 else
00888 {
00889 gearman_universal_set_error(connection->universal, "gearman_connection_read", "read:%d", errno);
00890 connection->universal->last_errno= errno;
00891 *ret_ptr= GEARMAN_ERRNO;
00892 }
00893
00894 gearman_connection_close(connection);
00895 return 0;
00896 }
00897
00898 break;
00899 }
00900
00901 *ret_ptr= GEARMAN_SUCCESS;
00902 return (size_t)read_size;
00903 }
00904
00905 gearman_return_t gearman_connection_set_events(gearman_connection_st *connection, short events)
00906 {
00907 gearman_return_t ret;
00908
00909 if ((connection->events | events) == connection->events)
00910 return GEARMAN_SUCCESS;
00911
00912 connection->events|= events;
00913
00914 if (connection->universal->event_watch_fn != NULL)
00915 {
00916 ret= connection->universal->event_watch_fn(connection, connection->events,
00917 (void *)connection->universal->event_watch_context);
00918 if (ret != GEARMAN_SUCCESS)
00919 {
00920 gearman_connection_close(connection);
00921 return ret;
00922 }
00923 }
00924
00925 return GEARMAN_SUCCESS;
00926 }
00927
00928 gearman_return_t gearman_connection_set_revents(gearman_connection_st *connection, short revents)
00929 {
00930 gearman_return_t ret;
00931
00932 if (revents != 0)
00933 connection->options.ready= true;
00934
00935 connection->revents= revents;
00936
00937
00938
00939
00940
00941 if (revents & POLLOUT && !(connection->events & POLLOUT) &&
00942 connection->universal->event_watch_fn != NULL)
00943 {
00944 ret= connection->universal->event_watch_fn(connection, connection->events,
00945 (void *)connection->universal->event_watch_context);
00946 if (ret != GEARMAN_SUCCESS)
00947 {
00948 gearman_connection_close(connection);
00949 return ret;
00950 }
00951 }
00952
00953 connection->events&= (short)~revents;
00954
00955 return GEARMAN_SUCCESS;
00956 }
00957
00958 void *gearman_connection_protocol_context(const gearman_connection_st *connection)
00959 {
00960 return connection->protocol_context;
00961 }
00962
00963 void gearman_connection_set_protocol_context(gearman_connection_st *connection, void *context)
00964 {
00965 connection->protocol_context= context;
00966 }
00967
00968 void gearman_connection_set_protocol_context_free_fn(gearman_connection_st *connection,
00969 gearman_connection_protocol_context_free_fn *function)
00970 {
00971 connection->protocol_context_free_fn= function;
00972 }
00973
00974 void gearman_connection_set_packet_pack_fn(gearman_connection_st *connection,
00975 gearman_packet_pack_fn *function)
00976 {
00977 connection->packet_pack_fn= function;
00978 }
00979
00980 void gearman_connection_set_packet_unpack_fn(gearman_connection_st *connection,
00981 gearman_packet_unpack_fn *function)
00982 {
00983 connection->packet_unpack_fn= function;
00984 }
00985
00986
00987
00988
00989
00990 static gearman_return_t _con_setsockopt(gearman_connection_st *connection)
00991 {
00992 int ret;
00993 struct linger linger;
00994 struct timeval waittime;
00995
00996 ret= 1;
00997 ret= setsockopt(connection->fd, IPPROTO_TCP, TCP_NODELAY, &ret,
00998 (socklen_t)sizeof(int));
00999 if (ret == -1 && errno != EOPNOTSUPP)
01000 {
01001 gearman_universal_set_error(connection->universal, "_con_setsockopt",
01002 "setsockopt:TCP_NODELAY:%d", errno);
01003 return GEARMAN_ERRNO;
01004 }
01005
01006 linger.l_onoff= 1;
01007 linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
01008 ret= setsockopt(connection->fd, SOL_SOCKET, SO_LINGER, &linger,
01009 (socklen_t)sizeof(struct linger));
01010 if (ret == -1)
01011 {
01012 gearman_universal_set_error(connection->universal, "_con_setsockopt",
01013 "setsockopt:SO_LINGER:%d", errno);
01014 return GEARMAN_ERRNO;
01015 }
01016
01017 waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
01018 waittime.tv_usec= 0;
01019 ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDTIMEO, &waittime,
01020 (socklen_t)sizeof(struct timeval));
01021 if (ret == -1 && errno != ENOPROTOOPT)
01022 {
01023 gearman_universal_set_error(connection->universal, "_con_setsockopt",
01024 "setsockopt:SO_SNDTIMEO:%d", errno);
01025 return GEARMAN_ERRNO;
01026 }
01027
01028 ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVTIMEO, &waittime,
01029 (socklen_t)sizeof(struct timeval));
01030 if (ret == -1 && errno != ENOPROTOOPT)
01031 {
01032 gearman_universal_set_error(connection->universal, "_con_setsockopt",
01033 "setsockopt:SO_RCVTIMEO:%d", errno);
01034 return GEARMAN_ERRNO;
01035 }
01036
01037 ret= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
01038 ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDBUF, &ret, (socklen_t)sizeof(int));
01039 if (ret == -1)
01040 {
01041 gearman_universal_set_error(connection->universal, "_con_setsockopt",
01042 "setsockopt:SO_SNDBUF:%d", errno);
01043 return GEARMAN_ERRNO;
01044 }
01045
01046 ret= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
01047 ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVBUF, &ret, (socklen_t)sizeof(int));
01048 if (ret == -1)
01049 {
01050 gearman_universal_set_error(connection->universal, "_con_setsockopt",
01051 "setsockopt:SO_RCVBUF:%d", errno);
01052 return GEARMAN_ERRNO;
01053 }
01054
01055 ret= fcntl(connection->fd, F_GETFL, 0);
01056 if (ret == -1)
01057 {
01058 gearman_universal_set_error(connection->universal, "_con_setsockopt", "fcntl:F_GETFL:%d",
01059 errno);
01060 return GEARMAN_ERRNO;
01061 }
01062
01063 ret= fcntl(connection->fd, F_SETFL, ret | O_NONBLOCK);
01064 if (ret == -1)
01065 {
01066 gearman_universal_set_error(connection->universal, "_con_setsockopt", "fcntl:F_SETFL:%d",
01067 errno);
01068 return GEARMAN_ERRNO;
01069 }
01070
01071 return GEARMAN_SUCCESS;
01072 }