47 #include <sys/types.h>
48 #include <sys/socket.h>
55 #include <qb/qblist.h>
56 #include <qb/qbdefs.h>
57 #include <qb/qbipcc.h>
69 #define MAP_ANONYMOUS MAP_ANON
76 #define MAX_RETRIES 100
81 #define CPG_MEMORY_MAP_UMASK 077
85 struct qb_list_head
list;
93 qb_ipcc_connection_t *
c;
104 static void cpg_inst_free (
void *inst);
112 struct qb_list_head
list;
123 coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
140 static void cpg_inst_free (
void *inst)
148 struct qb_list_head *iter, *tmp_iter;
159 hdb_handle_destroy (&cpg_handle_t_db, handle);
196 goto error_no_destroy;
200 if (error !=
CS_OK) {
201 goto error_no_destroy;
205 if (error !=
CS_OK) {
212 goto error_put_destroy;
237 hdb_handle_put (&cpg_handle_t_db, *handle);
242 hdb_handle_put (&cpg_handle_t_db, *handle);
244 hdb_handle_destroy (&cpg_handle_t_db, *handle);
259 if (error !=
CS_OK) {
267 hdb_handle_put (&cpg_handle_t_db, handle);
282 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c,
288 cpg_inst_finalize (
cpg_inst, handle);
289 hdb_handle_put (&cpg_handle_t_db, handle);
302 if (error !=
CS_OK) {
308 hdb_handle_put (&cpg_handle_t_db, handle);
321 if (error !=
CS_OK) {
327 hdb_handle_put (&cpg_handle_t_db, handle);
340 if (error !=
CS_OK) {
346 hdb_handle_put (&cpg_handle_t_db, handle);
359 if (error !=
CS_OK) {
365 hdb_handle_put (&cpg_handle_t_db, handle);
383 struct qb_ipc_response_header *dispatch_data;
389 struct qb_list_head *iter, *tmp_iter;
399 if (error !=
CS_OK) {
411 dispatch_data = (
struct qb_ipc_response_header *)dispatch_buf;
413 errno_res = qb_ipcc_event_recv (
437 if (error !=
CS_OK) {
452 switch (dispatch_data->id) {
460 marshall_from_mar_cpg_name_t (
462 &res_cpg_deliver_callback->group_name);
466 res_cpg_deliver_callback->nodeid,
467 res_cpg_deliver_callback->pid,
468 &res_cpg_deliver_callback->message,
469 res_cpg_deliver_callback->msglen);
475 marshall_from_mar_cpg_name_t (
477 &res_cpg_partial_deliver_callback->group_name);
482 assembly_data = NULL;
485 if (current_assembly_data->
nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->
pid == res_cpg_partial_deliver_callback->pid) {
486 assembly_data = current_assembly_data;
499 qb_list_del (&assembly_data->
list);
502 assembly_data = NULL;
506 if (!assembly_data) {
511 assembly_data->
nodeid = res_cpg_partial_deliver_callback->nodeid;
512 assembly_data->
pid = res_cpg_partial_deliver_callback->pid;
513 assembly_data->
assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
520 qb_list_init (&assembly_data->
list);
526 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
527 assembly_data->
assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
532 res_cpg_partial_deliver_callback->nodeid,
533 res_cpg_partial_deliver_callback->pid,
535 res_cpg_partial_deliver_callback->msglen);
537 qb_list_del (&assembly_data->
list);
551 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
555 left_list_start = res_cpg_confchg_callback->
member_list +
556 res_cpg_confchg_callback->member_list_entries;
557 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
558 marshall_from_mar_cpg_address_t (&left_list[i],
559 &left_list_start[i]);
561 joined_list_start = res_cpg_confchg_callback->
member_list +
562 res_cpg_confchg_callback->member_list_entries +
563 res_cpg_confchg_callback->left_list_entries;
564 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
565 marshall_from_mar_cpg_address_t (&joined_list[i],
566 &joined_list_start[i]);
568 marshall_from_mar_cpg_name_t (
570 &res_cpg_confchg_callback->group_name);
575 res_cpg_confchg_callback->member_list_entries,
577 res_cpg_confchg_callback->left_list_entries,
579 res_cpg_confchg_callback->joined_list_entries);
584 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
587 if (current_assembly_data->
nodeid != left_list[i].
nodeid || current_assembly_data->
pid != left_list[i].
pid)
590 qb_list_del (¤t_assembly_data->
list);
592 free(current_assembly_data);
604 marshall_from_mar_cpg_ring_id_t (&
ring_id, &res_cpg_totem_confchg_callback->ring_id);
605 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
606 totem_member_list[i] = res_cpg_totem_confchg_callback->
member_list[i];
611 res_cpg_totem_confchg_callback->member_list_entries,
640 hdb_handle_put (&cpg_handle_t_db, handle);
659 if (error !=
CS_OK) {
682 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 1,
685 if (error !=
CS_OK) {
690 error = response.header.error;
693 hdb_handle_put (&cpg_handle_t_db, handle);
713 if (error !=
CS_OK) {
727 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 1,
730 if (error !=
CS_OK) {
738 hdb_handle_put (&cpg_handle_t_db, handle);
747 int *member_list_entries)
762 if (member_list_entries == NULL) {
767 if (error !=
CS_OK) {
780 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, &iov, 1,
783 if (error !=
CS_OK) {
795 marshall_from_mar_cpg_address_t (&member_list[i],
801 hdb_handle_put (&cpg_handle_t_db, handle);
808 unsigned int *local_nodeid)
817 if (error !=
CS_OK) {
827 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, &iov, 1,
830 if (error !=
CS_OK) {
839 hdb_handle_put (&cpg_handle_t_db, handle);
852 if (error !=
CS_OK) {
858 hdb_handle_put (&cpg_handle_t_db, handle);
864 memory_map (
char *path,
const char *file,
void **buf,
size_t bytes)
873 long int sysconf_page_size;
876 snprintf (path, PATH_MAX,
"/dev/shm/%s", file);
880 (void)umask(old_umask);
885 (void)umask(old_umask);
891 res = ftruncate (fd, bytes);
893 goto error_close_unlink;
895 sysconf_page_size = sysconf(_SC_PAGESIZE);
896 if (sysconf_page_size <= 0) {
897 goto error_close_unlink;
899 page_size = sysconf_page_size;
900 buffer = malloc (page_size);
901 if (buffer == NULL) {
902 goto error_close_unlink;
904 memset (buffer, 0, page_size);
905 for (i = 0; i < (bytes / page_size); i++) {
907 written = write (fd, buffer, page_size);
908 if (written == -1 && errno == EINTR) {
911 if (written != page_size) {
913 goto error_close_unlink;
918 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
921 if (
addr == MAP_FAILED) {
922 goto error_close_unlink;
925 madvise(
addr, bytes, MADV_NOSYNC);
952 struct qb_ipc_response_header res_coroipcs_zc_alloc;
960 if (error !=
CS_OK) {
965 assert(memory_map (path,
"corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
969 munmap (buf, map_size);
975 req_coroipcc_zc_alloc.map_size = map_size;
976 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
978 iovec.iov_base = (
void *)&req_coroipcc_zc_alloc;
981 error = coroipcc_msg_send_reply_receive (
985 &res_coroipcs_zc_alloc,
986 sizeof (
struct qb_ipc_response_header));
988 if (error !=
CS_OK) {
997 hdb_handle_put (&cpg_handle_t_db, handle);
1009 struct qb_ipc_response_header res_coroipcs_zc_free;
1014 if (error !=
CS_OK) {
1020 req_coroipcc_zc_free.map_size =
header->map_size;
1021 req_coroipcc_zc_free.server_address =
header->server_address;
1023 iovec.iov_base = (
void *)&req_coroipcc_zc_free;
1026 error = coroipcc_msg_send_reply_receive (
1030 &res_coroipcs_zc_free,
1031 sizeof (
struct qb_ipc_response_header));
1033 if (error !=
CS_OK) {
1045 hdb_handle_put (&cpg_handle_t_db, handle);
1065 if (error !=
CS_OK) {
1088 iovec.iov_base = (
void *)&req_coroipcc_zc_execute;
1091 error = coroipcc_msg_send_reply_receive (
1098 if (error !=
CS_OK) {
1105 hdb_handle_put (&cpg_handle_t_db, handle);
1114 const struct iovec *iovec,
1115 unsigned int iov_len)
1119 struct iovec iov[2];
1123 size_t iov_sent = 0;
1135 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 2);
1137 while (error ==
CS_OK && sent < msg_len) {
1144 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1150 else if ((sent + iov[1].iov_len) == msg_len) {
1159 iov[1].iov_base = (
char *)iovec[i].iov_base + iov_sent;
1162 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 2,
1167 fprintf(stderr,
"sleep. counter=%d\n", retry_count);
1175 iov_sent += iov[1].iov_len;
1176 sent += iov[1].iov_len;
1179 if (iov_sent >= iovec[i].iov_len) {
1186 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 1);
1195 const struct iovec *iovec,
1196 unsigned int iov_len)
1201 struct iovec iov[64];
1206 if (error !=
CS_OK) {
1210 for (i = 0; i < iov_len; i++ ) {
1211 msg_len += iovec[i].iov_len;
1228 memcpy (&iov[1], iovec, iov_len *
sizeof (
struct iovec));
1230 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 2);
1232 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 1);
1235 hdb_handle_put (&cpg_handle_t_db, handle);
1256 if (cpg_iteration_handle == NULL) {
1272 if (error !=
CS_OK) {
1276 error =
hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1278 if (error !=
CS_OK) {
1279 goto error_put_cpg_db;
1282 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1284 if (error !=
CS_OK) {
1302 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c,
1308 if (error !=
CS_OK) {
1309 goto error_put_destroy;
1318 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1319 hdb_handle_put (&cpg_handle_t_db, handle);
1324 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1326 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1328 hdb_handle_put (&cpg_handle_t_db, handle);
1342 if (description == NULL) {
1346 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1348 if (error !=
CS_OK) {
1359 if (error !=
CS_OK) {
1366 if (error !=
CS_OK) {
1370 marshall_from_mar_cpg_iteration_description_t(
1377 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1392 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1394 if (error !=
CS_OK) {
1411 if (error !=
CS_OK) {
1421 hdb_handle_put (&cpg_iteration_handle_t_db, handle);