pacemaker  1.1.18-2b07d5c5a9
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2019 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82 #if 0
83  /* Should not be necessary */
85  get_ais_details(&local_nodeid, NULL);
86  goto done;
87  }
88 #endif
89 
90  if(handle == 0) {
91  crm_trace("Creating connection");
92  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
93  if (rc != CS_OK) {
94  crm_err("Could not connect to the CPG API: %s (%d)",
95  cs_strerror(rc), rc);
96  return 0;
97  }
98 
99  rc = cpg_fd_get(local_handle, &fd);
100  if (rc != CS_OK) {
101  crm_err("Could not obtain the CPG API connection: %s (%d)",
102  cs_strerror(rc), rc);
103  goto bail;
104  }
105 
106  /* CPG provider run as root (in given user namespace, anyway)? */
107  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
108  &found_uid, &found_gid))) {
109  crm_err("CPG provider is not authentic:"
110  " process %lld (uid: %lld, gid: %lld)",
111  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
112  (long long) found_uid, (long long) found_gid);
113  goto bail;
114  } else if (rv < 0) {
115  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
116  strerror(-rv), -rv);
117  goto bail;
118  }
119  }
120 
121  if (rc == CS_OK) {
122  retries = 0;
123  crm_trace("Performing lookup");
124  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
125  }
126 
127  if (rc != CS_OK) {
128  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
129  }
130 
131 bail:
132  if(handle == 0) {
133  crm_trace("Closing connection");
134  cpg_finalize(local_handle);
135  }
136  crm_debug("Local nodeid is %u", local_nodeid);
137  return local_nodeid;
138 }
139 
140 
143 
144 static ssize_t crm_cs_flush(gpointer data);
145 
146 static gboolean
147 crm_cs_flush_cb(gpointer data)
148 {
149  cs_message_timer = 0;
150  crm_cs_flush(data);
151  return FALSE;
152 }
153 
154 #define CS_SEND_MAX 200
155 static ssize_t
156 crm_cs_flush(gpointer data)
157 {
158  int sent = 0;
159  ssize_t rc = 0;
160  int queue_len = 0;
161  static unsigned int last_sent = 0;
162  cpg_handle_t *handle = (cpg_handle_t *)data;
163 
164  if (*handle == 0) {
165  crm_trace("Connection is dead");
166  return pcmk_ok;
167  }
168 
169  queue_len = g_list_length(cs_message_queue);
170  if ((queue_len % 1000) == 0 && queue_len > 1) {
171  crm_err("CPG queue has grown to %d", queue_len);
172 
173  } else if (queue_len == CS_SEND_MAX) {
174  crm_warn("CPG queue has grown to %d", queue_len);
175  }
176 
177  if (cs_message_timer) {
178  /* There is already a timer, wait until it goes off */
179  crm_trace("Timer active %d", cs_message_timer);
180  return pcmk_ok;
181  }
182 
183  while (cs_message_queue && sent < CS_SEND_MAX) {
184  struct iovec *iov = cs_message_queue->data;
185 
186  errno = 0;
187  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
188 
189  if (rc != CS_OK) {
190  break;
191  }
192 
193  sent++;
194  last_sent++;
195  crm_trace("CPG message sent, size=%llu",
196  (unsigned long long) iov->iov_len);
197 
198  cs_message_queue = g_list_remove(cs_message_queue, iov);
199  free(iov->iov_base);
200  free(iov);
201  }
202 
203  queue_len -= sent;
204  if (sent > 1 || cs_message_queue) {
205  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
206  sent, queue_len, last_sent, ais_error2text(rc),
207  (long long) rc);
208  } else {
209  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
210  sent, queue_len, last_sent, ais_error2text(rc),
211  (long long) rc);
212  }
213 
214  if (cs_message_queue) {
215  uint32_t delay_ms = 100;
216  if(rc != CS_OK) {
217  /* Proportionally more if sending failed but cap at 1s */
218  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
219  }
220  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
221  }
222 
223  return rc;
224 }
225 
226 gboolean
227 send_cpg_iov(struct iovec * iov)
228 {
229  static unsigned int queued = 0;
230 
231  queued++;
232  crm_trace("Queueing CPG message %u (%llu bytes)",
233  queued, (unsigned long long) iov->iov_len);
234  cs_message_queue = g_list_append(cs_message_queue, iov);
235  crm_cs_flush(&pcmk_cpg_handle);
236  return TRUE;
237 }
238 
239 static int
240 pcmk_cpg_dispatch(gpointer user_data)
241 {
242  int rc = 0;
243  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
244 
245  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
246  if (rc != CS_OK) {
247  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
248  cluster->cpg_handle = 0;
249  return -1;
250 
251  } else if(cpg_evicted) {
252  crm_err("Evicted from CPG membership");
253  return -1;
254  }
255  return 0;
256 }
257 
258 char *
259 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
260  uint32_t *kind, const char **from)
261 {
262  char *data = NULL;
263  AIS_Message *msg = (AIS_Message *) content;
264 
265  if(handle) {
266  /* 'msg' came from CPG not the plugin
267  * Do filtering and field massaging
268  */
270  const char *local_name = get_local_node_name();
271 
272  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
273  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
274  return NULL;
275 
276  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
277  /* Not for us */
278  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
279  return NULL;
280  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
281  /* Not for us */
282  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
283  return NULL;
284  }
285 
286  msg->sender.id = nodeid;
287  if (msg->sender.size == 0) {
288  crm_node_t *peer = crm_get_peer(nodeid, NULL);
289 
290  if (peer == NULL) {
291  crm_err("Peer with nodeid=%u is unknown", nodeid);
292 
293  } else if (peer->uname == NULL) {
294  crm_err("No uname for peer with nodeid=%u", nodeid);
295 
296  } else {
297  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
298  msg->sender.size = strlen(peer->uname);
299  memset(msg->sender.uname, 0, MAX_NAME);
300  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
301  }
302  }
303  }
304 
305  crm_trace("Got new%s message (size=%d, %d, %d)",
306  msg->is_compressed ? " compressed" : "",
307  ais_data_len(msg), msg->size, msg->compressed_size);
308 
309  if (kind != NULL) {
310  *kind = msg->header.id;
311  }
312  if (from != NULL) {
313  *from = msg->sender.uname;
314  }
315 
316  if (msg->is_compressed && msg->size > 0) {
317  int rc = BZ_OK;
318  char *uncompressed = NULL;
319  unsigned int new_size = msg->size + 1;
320 
321  if (check_message_sanity(msg, NULL) == FALSE) {
322  goto badmsg;
323  }
324 
325  crm_trace("Decompressing message data");
326  uncompressed = calloc(1, new_size);
327  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
328 
329  if (rc != BZ_OK) {
330  crm_err("Decompression failed: %d", rc);
331  free(uncompressed);
332  goto badmsg;
333  }
334 
335  CRM_ASSERT(rc == BZ_OK);
336  CRM_ASSERT(new_size == msg->size);
337 
338  data = uncompressed;
339 
340  } else if (check_message_sanity(msg, data) == FALSE) {
341  goto badmsg;
342 
343  } else if (safe_str_eq("identify", data)) {
344  char *pid_s = crm_getpid_s();
345 
346  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
347  free(pid_s);
348  return NULL;
349 
350  } else {
351  data = strdup(msg->data);
352  }
353 
354  if (msg->header.id != crm_class_members) {
355  /* Is this even needed anymore? */
356  crm_get_peer(msg->sender.id, msg->sender.uname);
357  }
358 
359  if (msg->header.id == crm_class_rmpeer) {
360  uint32_t id = crm_int_helper(data, NULL);
361 
362  crm_info("Removing peer %s/%u", data, id);
363  reap_crm_member(id, NULL);
364  free(data);
365  return NULL;
366 
367 #if SUPPORT_PLUGIN
368  } else if (is_classic_ais_cluster()) {
370 #endif
371  }
372 
373  crm_trace("Payload: %.200s", data);
374  return data;
375 
376  badmsg:
377  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
378  " min=%d, total=%d, size=%d, bz2_size=%d",
379  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
380  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
381  msg->sender.pid, (int)sizeof(AIS_Message),
382  msg->header.size, msg->size, msg->compressed_size);
383 
384  free(data);
385  return NULL;
386 }
387 
388 static int cmp_member_list_nodeid(const void *first,
389  const void *second)
390 {
391  const struct cpg_address *const a = *((const struct cpg_address **) first),
392  *const b = *((const struct cpg_address **) second);
393  if (a->nodeid < b->nodeid) {
394  return -1;
395  } else if (a->nodeid > b->nodeid) {
396  return 1;
397  }
398  /* don't bother with "reason" nor "pid" */
399  return 0;
400 }
401 
402 void
403 pcmk_cpg_membership(cpg_handle_t handle,
404  const struct cpg_name *groupName,
405  const struct cpg_address *member_list, size_t member_list_entries,
406  const struct cpg_address *left_list, size_t left_list_entries,
407  const struct cpg_address *joined_list, size_t joined_list_entries)
408 {
409  int i;
410  gboolean found = FALSE;
411  static int counter = 0;
413  const struct cpg_address *key, **rival, **sorted;
414 
415  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
416  CRM_ASSERT(sorted != NULL);
417 
418  for (size_t iter = 0; iter < member_list_entries; iter++) {
419  sorted[iter] = member_list + iter;
420  }
421  /* so that the cross-matching multiply-subscribed nodes is then cheap */
422  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
423  cmp_member_list_nodeid);
424 
425  for (i = 0; i < left_list_entries; i++) {
426  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
427 
428  crm_info("Node %u left group %s (peer=%s:%llu, counter=%d.%d)",
429  left_list[i].nodeid, groupName->value,
430  (peer? peer->uname : "<none>"),
431  (unsigned long long) left_list[i].pid, counter, i);
432 
433  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
434  and not playing by this rule may go wild in case of multiple
435  residual instances of the same pacemaker daemon at the same node
436  -- we must ensure that the possible local rival(s) won't make us
437  cry out and bail (e.g. when they quit themselves), since all the
438  surrounding logic denies this simple fact that the full membership
439  is discriminated also per the PID of the process beside mere node
440  ID (and implicitly, group ID); practically, this will be sound in
441  terms of not preventing progress, since all the CPG joiners are
442  also API end-point carriers, and that's what matters locally
443  (who's the winner);
444  remotely, we will just compare leave_list and member_list and if
445  the left process has it's node retained in member_list (under some
446  other PID, anyway) we will just ignore it as well
447  XXX: long-term fix is to establish in-out PID-aware tracking? */
448  if (peer) {
449  key = &left_list[i];
450  rival = bsearch(&key, sorted, member_list_entries,
451  sizeof(const struct cpg_address *),
452  cmp_member_list_nodeid);
453  if (rival == NULL) {
454  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg,
455  OFFLINESTATUS);
456  } else if (left_list[i].nodeid == local_nodeid) {
457  crm_info("Ignoring the above event %s.%d, comes from a local"
458  " rival process (presumably not us): %llu",
459  groupName->value, counter,
460  (unsigned long long) left_list[i].pid);
461  } else {
462  crm_info("Ignoring the above event %s.%d, comes from"
463  " a rival-rich node: %llu (e.g. %llu process"
464  " carries on)",
465  groupName->value, counter,
466  (unsigned long long) left_list[i].pid,
467  (unsigned long long) (*rival)->pid);
468  }
469  }
470  }
471  free(sorted);
472  sorted = NULL;
473 
474  for (i = 0; i < joined_list_entries; i++) {
475  crm_info("Node %u joined group %s (counter=%d.%d, pid=%llu,"
476  " unchecked for rivals)",
477  joined_list[i].nodeid, groupName->value, counter, i,
478  (unsigned long long) left_list[i].pid);
479  }
480 
481  for (i = 0; i < member_list_entries; i++) {
482  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
483 
484  crm_info("Node %u still member of group %s (peer=%s:%llu,"
485  " counter=%d.%d, at least once)",
486  member_list[i].nodeid, groupName->value,
487  (peer? peer->uname : "<none>"), member_list[i].pid,
488  counter, i);
489 
490  if (member_list[i].nodeid == local_nodeid
491  && member_list[i].pid != getpid()) {
492  /* see the note above */
493  crm_info("Ignoring the above event %s.%d, comes from a local rival"
494  " process: %llu", groupName->value, counter,
495  (unsigned long long) member_list[i].pid);
496  continue;
497  }
498 
499  /* Anyone that is sending us CPG messages must also be a _CPG_ member.
500  * But it's _not_ safe to assume it's in the quorum membership.
501  * We may have just found out it's dead and are processing the last couple of messages it sent
502  */
503  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
504  if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
505  time_t now = time(NULL);
506 
507  /* Co-opt the otherwise unused votes field */
508  if(peer->votes == 0) {
509  peer->votes = now;
510 
511  } else if(now > (60 + peer->votes)) {
512  /* On the otherhand, if we're still getting messages, at a certain point
513  * we need to acknowledge our internal cache is probably wrong
514  *
515  * Set the threshold to 1 minute
516  */
517  crm_err("Node %s[%u] appears to be online even though we think"
518  " it is dead (unchecked for rivals)",
519  peer->uname, peer->id);
520  if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) {
521  peer->votes = 0;
522  }
523  }
524  }
525 
526  if (local_nodeid == member_list[i].nodeid) {
527  found = TRUE;
528  }
529  }
530 
531  if (!found) {
532  crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
533  cpg_evicted = TRUE;
534  }
535 
536  counter++;
537 }
538 
539 gboolean
541 {
542  cs_error_t rc;
543  int fd = -1;
544  int retries = 0;
545  uint32_t id = 0;
546  crm_node_t *peer = NULL;
547  cpg_handle_t handle = 0;
548  uid_t found_uid = 0;
549  gid_t found_gid = 0;
550  pid_t found_pid = 0;
551  int rv;
552 
553  struct mainloop_fd_callbacks cpg_fd_callbacks = {
554  .dispatch = pcmk_cpg_dispatch,
555  .destroy = cluster->destroy,
556  };
557 
558  cpg_callbacks_t cpg_callbacks = {
559  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
560  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
561  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
562  /* .cpg_confchg_fn = pcmk_cpg_membership, */
563  };
564 
565  cpg_evicted = FALSE;
566  cluster->group.length = 0;
567  cluster->group.value[0] = 0;
568 
569  /* group.value is char[128] */
570  strncpy(cluster->group.value, crm_system_name?crm_system_name:"unknown", 127);
571  cluster->group.value[127] = 0;
572  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
573 
574  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
575  if (rc != CS_OK) {
576  crm_err("Could not connect to the CPG API: %s (%d)",
577  cs_strerror(rc), rc);
578  goto bail;
579  }
580 
581  rc = cpg_fd_get(handle, &fd);
582  if (rc != CS_OK) {
583  crm_err("Could not obtain the CPG API connection: %s (%d)",
584  cs_strerror(rc), rc);
585  goto bail;
586  }
587 
588  /* CPG provider run as root (in given user namespace, anyway)? */
589  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
590  &found_uid, &found_gid))) {
591  crm_err("CPG provider is not authentic:"
592  " process %lld (uid: %lld, gid: %lld)",
593  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
594  (long long) found_uid, (long long) found_gid);
595  rc = CS_ERR_ACCESS;
596  goto bail;
597  } else if (rv < 0) {
598  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
599  strerror(-rv), -rv);
600  rc = CS_ERR_ACCESS;
601  goto bail;
602  }
603 
604  id = get_local_nodeid(handle);
605  if (id == 0) {
606  crm_err("Could not get local node id from the CPG API");
607  goto bail;
608 
609  }
610  cluster->nodeid = id;
611 
612  retries = 0;
613  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
614  if (rc != CS_OK) {
615  crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
616  goto bail;
617  }
618 
619  pcmk_cpg_handle = handle;
620  cluster->cpg_handle = handle;
621  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
622 
623  bail:
624  if (rc != CS_OK) {
625  cpg_finalize(handle);
626  return FALSE;
627  }
628 
629  peer = crm_get_peer(id, NULL);
630  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
631  return TRUE;
632 }
633 
634 gboolean
635 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
636 {
637  gboolean rc = TRUE;
638  char *data = NULL;
639 
640  data = dump_xml_unformatted(msg);
641  rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
642  free(data);
643  return rc;
644 }
645 
646 gboolean
647 send_cluster_text(int class, const char *data,
648  gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
649 {
650  static int msg_id = 0;
651  static int local_pid = 0;
652  static int local_name_len = 0;
653  static const char *local_name = NULL;
654 
655  char *target = NULL;
656  struct iovec *iov;
657  AIS_Message *msg = NULL;
659 
660  /* There are only 6 handlers registered to crm_lib_service in plugin.c */
661  CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
662  return FALSE);
663 
664 #if !SUPPORT_PLUGIN
665  CRM_CHECK(dest != crm_msg_ais, return FALSE);
666 #endif
667 
668  if(local_name == NULL) {
669  local_name = get_local_node_name();
670  }
671  if(local_name_len == 0 && local_name) {
672  local_name_len = strlen(local_name);
673  }
674 
675  if (data == NULL) {
676  data = "";
677  }
678 
679  if (local_pid == 0) {
680  local_pid = getpid();
681  }
682 
683  if (sender == crm_msg_none) {
684  sender = local_pid;
685  }
686 
687  msg = calloc(1, sizeof(AIS_Message));
688 
689  msg_id++;
690  msg->id = msg_id;
691  msg->header.id = class;
692  msg->header.error = CS_OK;
693 
694  msg->host.type = dest;
695  msg->host.local = local;
696 
697  if (node) {
698  if (node->uname) {
699  target = strdup(node->uname);
700  msg->host.size = strlen(node->uname);
701  memset(msg->host.uname, 0, MAX_NAME);
702  memcpy(msg->host.uname, node->uname, msg->host.size);
703  } else {
704  target = crm_strdup_printf("%u", node->id);
705  }
706  msg->host.id = node->id;
707  } else {
708  target = strdup("all");
709  }
710 
711  msg->sender.id = 0;
712  msg->sender.type = sender;
713  msg->sender.pid = local_pid;
714  msg->sender.size = local_name_len;
715  memset(msg->sender.uname, 0, MAX_NAME);
716  if(local_name && msg->sender.size) {
717  memcpy(msg->sender.uname, local_name, msg->sender.size);
718  }
719 
720  msg->size = 1 + strlen(data);
721  msg->header.size = sizeof(AIS_Message) + msg->size;
722 
723  if (msg->size < CRM_BZ2_THRESHOLD) {
724  msg = realloc_safe(msg, msg->header.size);
725  memcpy(msg->data, data, msg->size);
726 
727  } else {
728  char *compressed = NULL;
729  unsigned int new_size = 0;
730  char *uncompressed = strdup(data);
731 
732  if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
733 
734  msg->header.size = sizeof(AIS_Message) + new_size;
735  msg = realloc_safe(msg, msg->header.size);
736  memcpy(msg->data, compressed, new_size);
737 
738  msg->is_compressed = TRUE;
739  msg->compressed_size = new_size;
740 
741  } else {
742  msg = realloc_safe(msg, msg->header.size);
743  memcpy(msg->data, data, msg->size);
744  }
745 
746  free(uncompressed);
747  free(compressed);
748  }
749 
750  iov = calloc(1, sizeof(struct iovec));
751  iov->iov_base = msg;
752  iov->iov_len = msg->header.size;
753 
754  if (msg->compressed_size) {
755  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
756  msg->id, target, (unsigned long long) iov->iov_len,
757  msg->compressed_size, data);
758  } else {
759  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
760  msg->id, target, (unsigned long long) iov->iov_len,
761  msg->size, data);
762  }
763  free(target);
764 
765 #if SUPPORT_PLUGIN
766  /* The plugin is the only time we don't use CPG messaging */
768  return send_plugin_text(class, iov);
769  }
770 #endif
771 
772  send_cpg_iov(iov);
773 
774  return TRUE;
775 }
776 
778 text2msg_type(const char *text)
779 {
780  int type = crm_msg_none;
781 
782  CRM_CHECK(text != NULL, return type);
783  if (safe_str_eq(text, "ais")) {
784  type = crm_msg_ais;
785  } else if (safe_str_eq(text, "crm_plugin")) {
786  type = crm_msg_ais;
787  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
788  type = crm_msg_cib;
789  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
790  type = crm_msg_crmd;
791  } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
792  type = crm_msg_crmd;
793  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
794  type = crm_msg_te;
795  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
796  type = crm_msg_pe;
797  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
798  type = crm_msg_lrmd;
799  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
800  type = crm_msg_stonithd;
801  } else if (safe_str_eq(text, "stonith-ng")) {
802  type = crm_msg_stonith_ng;
803  } else if (safe_str_eq(text, "attrd")) {
804  type = crm_msg_attrd;
805 
806  } else {
807  /* This will normally be a transient client rather than
808  * a cluster daemon. Set the type to the pid of the client
809  */
810  int scan_rc = sscanf(text, "%d", &type);
811 
812  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
813  /* Ensure it's sane */
814  type = crm_msg_none;
815  }
816  }
817  return type;
818 }
bool send_plugin_text(int class, struct iovec *iov)
Definition: legacy.c:135
enum crm_ais_msg_types type
Definition: internal.h:38
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:164
char data[0]
Definition: internal.h:55
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:227
uint32_t local_nodeid
Definition: plugin.c:65
#define crm_notice(fmt, args...)
Definition: logging.h:250
gboolean is_compressed
Definition: internal.h:47
uint32_t size
Definition: internal.h:52
gboolean safe_str_neq(const char *a, const char *b)
Definition: strings.c:150
crm_ais_msg_types
Definition: cluster.h:128
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:810
uint32_t nodeid
Definition: cluster.h:97
uint32_t id
Definition: cluster.h:73
gboolean crm_is_peer_active(const crm_node_t *node)
Definition: membership.c:295
const char * get_local_node_name(void)
Definition: cluster.c:289
void(* destroy)(gpointer)
Definition: cluster.h:99
#define pcmk_ok
Definition: error.h:42
long long crm_int_helper(const char *text, char **end_text)
Definition: strings.c:80
uint32_t id
Definition: internal.h:35
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:34
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:676
char * crm_system_name
Definition: utils.c:61
#define CS_SEND_MAX
Definition: cpg.c:154
uint32_t pid
Definition: internal.h:49
char * strerror(int errnum)
AIS_Host sender
Definition: internal.h:53
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:259
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:86
void plugin_handle_membership(AIS_Message *msg)
Definition: legacy.c:222
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:90
int cs_message_timer
Definition: cpg.c:142
#define crm_warn(fmt, args...)
Definition: logging.h:249
uint32_t id
Definition: internal.h:48
#define crm_debug(fmt, args...)
Definition: logging.h:253
GListPtr cs_message_queue
Definition: cpg.c:141
#define crm_trace(fmt, args...)
Definition: logging.h:254
gboolean local
Definition: internal.h:37
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:891
#define CRM_SYSTEM_PENGINE
Definition: crm.h:92
AIS_Host sender
Definition: internal.h:50
uint32_t id
Definition: internal.h:46
gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:647
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: plugin.c:1356
struct crm_ais_msg_s AIS_Message
Definition: internal.h:32
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
#define ais_data_len(msg)
Definition: internal.h:208
uint32_t size
Definition: internal.h:39
#define CRM_NODE_MEMBER
Definition: cluster.h:44
guint reap_crm_member(uint32_t id, const char *name)
Remove all peer cache entries matching a node ID and/or uname.
Definition: membership.c:352
uint32_t compressed_size
Definition: internal.h:53
uint32_t counter
Definition: internal.h:50
#define MAX_NAME
Definition: crm.h:42
#define CRM_SYSTEM_CRMD
Definition: crm.h:90
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
Definition: strings.c:413
#define CRM_SYSTEM_STONITHD
Definition: crm.h:94
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1077
#define CRM_SYSTEM_CIB
Definition: crm.h:89
#define CRM_SYSTEM_TENGINE
Definition: crm.h:93
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
#define crm_err(fmt, args...)
Definition: logging.h:248
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:124
char uname[MAX_NAME]
Definition: internal.h:40
#define OFFLINESTATUS
Definition: util.h:53
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:778
#define CRM_BZ2_THRESHOLD
Definition: xml.h:50
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3825
#define CRM_SYSTEM_LRMD
Definition: crm.h:91
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:635
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
char data[0]
Definition: internal.h:58
char * state
Definition: cluster.h:84
Wrappers for and extensions to libqb IPC.
int32_t votes
Definition: cluster.h:78
uint32_t pid
Definition: internal.h:36
char * uname
Definition: cluster.h:82
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
AIS_Host host
Definition: internal.h:49
#define safe_str_eq(a, b)
Definition: util.h:72
#define ONLINESTATUS
Definition: util.h:52
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:540
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:403
GList * GListPtr
Definition: crm.h:219
#define crm_info(fmt, args...)
Definition: logging.h:251
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:540
gboolean is_classic_ais_cluster(void)
Definition: cluster.c:624
enum crm_ais_msg_types type
Definition: internal.h:51
enum cluster_type_e get_cluster_type(void)
Definition: cluster.c:513
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc.c:1376
gboolean local
Definition: internal.h:50