pacemaker  1.1.15-e174ec8
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
mainloop.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <crm_internal.h>
20 
21 #ifndef _GNU_SOURCE
22 # define _GNU_SOURCE
23 #endif
24 
25 #include <stdlib.h>
26 #include <signal.h>
27 #include <errno.h>
28 
29 #include <sys/wait.h>
30 
31 #include <crm/crm.h>
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/ipcs.h>
35 
36 struct mainloop_child_s {
37  pid_t pid;
38  char *desc;
39  unsigned timerid;
40  unsigned watchid;
41  gboolean timeout;
42  void *privatedata;
43 
45 
46  /* Called when a process dies */
47  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
48 };
49 
50 struct trigger_s {
51  GSource source;
52  gboolean running;
53  gboolean trigger;
54  void *user_data;
55  guint id;
56 
57 };
58 
59 static gboolean
60 crm_trigger_prepare(GSource * source, gint * timeout)
61 {
62  crm_trigger_t *trig = (crm_trigger_t *) source;
63 
64  /* cluster-glue's FD and IPC related sources make use of
65  * g_source_add_poll() but do not set a timeout in their prepare
66  * functions
67  *
68  * This means mainloop's poll() will block until an event for one
69  * of these sources occurs - any /other/ type of source, such as
70  * this one or g_idle_*, that doesn't use g_source_add_poll() is
71  * S-O-L and won't be processed until there is something fd-based
72  * happens.
73  *
74  * Luckily the timeout we can set here affects all sources and
75  * puts an upper limit on how long poll() can take.
76  *
77  * So unconditionally set a small-ish timeout, not too small that
78  * we're in constant motion, which will act as an upper bound on
79  * how long the signal handling might be delayed for.
80  */
81  *timeout = 500; /* Timeout in ms */
82 
83  return trig->trigger;
84 }
85 
86 static gboolean
87 crm_trigger_check(GSource * source)
88 {
89  crm_trigger_t *trig = (crm_trigger_t *) source;
90 
91  return trig->trigger;
92 }
93 
94 static gboolean
95 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
96 {
97  int rc = TRUE;
98  crm_trigger_t *trig = (crm_trigger_t *) source;
99 
100  if (trig->running) {
101  /* Wait until the existing job is complete before starting the next one */
102  return TRUE;
103  }
104  trig->trigger = FALSE;
105 
106  if (callback) {
107  rc = callback(trig->user_data);
108  if (rc < 0) {
109  crm_trace("Trigger handler %p not yet complete", trig);
110  trig->running = TRUE;
111  rc = TRUE;
112  }
113  }
114  return rc;
115 }
116 
117 static void
118 crm_trigger_finalize(GSource * source)
119 {
120  crm_trace("Trigger %p destroyed", source);
121 }
122 
123 #if 0
124 struct _GSourceCopy
125 {
126  gpointer callback_data;
127  GSourceCallbackFuncs *callback_funcs;
128 
129  const GSourceFuncs *source_funcs;
130  guint ref_count;
131 
132  GMainContext *context;
133 
134  gint priority;
135  guint flags;
136  guint source_id;
137 
138  GSList *poll_fds;
139 
140  GSource *prev;
141  GSource *next;
142 
143  char *name;
144 
145  void *priv;
146 };
147 
148 static int
149 g_source_refcount(GSource * source)
150 {
151  /* Duplicating the contents of private header files is a necessary evil */
152  if (source) {
153  struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
154  return evil->ref_count;
155  }
156  return 0;
157 }
158 #else
159 static int g_source_refcount(GSource * source)
160 {
161  return 0;
162 }
163 #endif
164 
165 static GSourceFuncs crm_trigger_funcs = {
166  crm_trigger_prepare,
167  crm_trigger_check,
168  crm_trigger_dispatch,
169  crm_trigger_finalize,
170 };
171 
172 static crm_trigger_t *
173 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
174  gpointer userdata)
175 {
176  crm_trigger_t *trigger = NULL;
177 
178  trigger = (crm_trigger_t *) source;
179 
180  trigger->id = 0;
181  trigger->trigger = FALSE;
182  trigger->user_data = userdata;
183 
184  if (dispatch) {
185  g_source_set_callback(source, dispatch, trigger, NULL);
186  }
187 
188  g_source_set_priority(source, priority);
189  g_source_set_can_recurse(source, FALSE);
190 
191  crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
192  trigger->id = g_source_attach(source, NULL);
193  crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
194 
195  return trigger;
196 }
197 
198 void
200 {
201  crm_trace("Trigger handler %p complete", trig);
202  trig->running = FALSE;
203 }
204 
205 /* If dispatch returns:
206  * -1: Job running but not complete
207  * 0: Remove the trigger from mainloop
208  * 1: Leave the trigger in mainloop
209  */
211 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
212 {
213  GSource *source = NULL;
214 
215  CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
216  source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
217  CRM_ASSERT(source != NULL);
218 
219  return mainloop_setup_trigger(source, priority, dispatch, userdata);
220 }
221 
222 void
224 {
225  if(source) {
226  source->trigger = TRUE;
227  }
228 }
229 
230 gboolean
232 {
233  GSource *gs = NULL;
234 
235  if(source == NULL) {
236  return TRUE;
237  }
238 
239  gs = (GSource *)source;
240 
241  if(g_source_refcount(gs) > 2) {
242  crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
243  }
244 
245  g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
246  g_source_unref(gs); /* The caller no longer carries a reference to source
247  *
248  * At this point the source should be free'd,
249  * unless we're currently processing said
250  * source, in which case mainloop holds an
251  * additional reference and it will be free'd
252  * once our processing completes
253  */
254  return TRUE;
255 }
256 
257 typedef struct signal_s {
258  crm_trigger_t trigger; /* must be first */
259  void (*handler) (int sig);
260  int signal;
261 
262 } crm_signal_t;
263 
264 static crm_signal_t *crm_signals[NSIG];
265 
266 static gboolean
267 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
268 {
269  crm_signal_t *sig = (crm_signal_t *) source;
270 
271  if(sig->signal != SIGCHLD) {
272  crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
273  strsignal(sig->signal), sig->signal,
274  (sig->handler? "invoking" : "no"));
275  }
276 
277  sig->trigger.trigger = FALSE;
278  if (sig->handler) {
279  sig->handler(sig->signal);
280  }
281  return TRUE;
282 }
283 
284 static void
285 mainloop_signal_handler(int sig)
286 {
287  if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
288  mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
289  }
290 }
291 
292 static GSourceFuncs crm_signal_funcs = {
293  crm_trigger_prepare,
294  crm_trigger_check,
295  crm_signal_dispatch,
296  crm_trigger_finalize,
297 };
298 
299 gboolean
300 crm_signal(int sig, void (*dispatch) (int sig))
301 {
302  sigset_t mask;
303  struct sigaction sa;
304  struct sigaction old;
305 
306  if (sigemptyset(&mask) < 0) {
307  crm_perror(LOG_ERR, "Call to sigemptyset failed");
308  return FALSE;
309  }
310 
311  memset(&sa, 0, sizeof(struct sigaction));
312  sa.sa_handler = dispatch;
313  sa.sa_flags = SA_RESTART;
314  sa.sa_mask = mask;
315 
316  if (sigaction(sig, &sa, &old) < 0) {
317  crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
318  return FALSE;
319  }
320 
321  return TRUE;
322 }
323 
324 gboolean
325 mainloop_add_signal(int sig, void (*dispatch) (int sig))
326 {
327  GSource *source = NULL;
328  int priority = G_PRIORITY_HIGH - 1;
329 
330  if (sig == SIGTERM) {
331  /* TERM is higher priority than other signals,
332  * signals are higher priority than other ipc.
333  * Yes, minus: smaller is "higher"
334  */
335  priority--;
336  }
337 
338  if (sig >= NSIG || sig < 0) {
339  crm_err("Signal %d is out of range", sig);
340  return FALSE;
341 
342  } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
343  crm_trace("Signal handler for %d is already installed", sig);
344  return TRUE;
345 
346  } else if (crm_signals[sig] != NULL) {
347  crm_err("Different signal handler for %d is already installed", sig);
348  return FALSE;
349  }
350 
351  CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
352  source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
353 
354  crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
355  CRM_ASSERT(crm_signals[sig] != NULL);
356 
357  crm_signals[sig]->handler = dispatch;
358  crm_signals[sig]->signal = sig;
359 
360  if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
361  crm_signal_t *tmp = crm_signals[sig];
362 
363  crm_signals[sig] = NULL;
364 
366  return FALSE;
367  }
368 #if 0
369  /* If we want signals to interrupt mainloop's poll(), instead of waiting for
370  * the timeout, then we should call siginterrupt() below
371  *
372  * For now, just enforce a low timeout
373  */
374  if (siginterrupt(sig, 1) < 0) {
375  crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
376  }
377 #endif
378 
379  return TRUE;
380 }
381 
382 gboolean
384 {
385  crm_signal_t *tmp = NULL;
386 
387  if (sig >= NSIG || sig < 0) {
388  crm_err("Signal %d is out of range", sig);
389  return FALSE;
390 
391  } else if (crm_signal(sig, NULL) == FALSE) {
392  crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
393  return FALSE;
394 
395  } else if (crm_signals[sig] == NULL) {
396  return TRUE;
397  }
398 
399  crm_trace("Destroying signal %d", sig);
400  tmp = crm_signals[sig];
401  crm_signals[sig] = NULL;
403  return TRUE;
404 }
405 
406 static qb_array_t *gio_map = NULL;
407 
408 void
410 {
411  if(gio_map) {
412  qb_array_free(gio_map);
413  }
414 }
415 
416 /*
417  * libqb...
418  */
419 struct gio_to_qb_poll {
420  int32_t is_used;
421  guint source;
422  int32_t events;
423  void *data;
424  qb_ipcs_dispatch_fn_t fn;
425  enum qb_loop_priority p;
426 };
427 
428 static gboolean
429 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
430 {
431  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
432  gint fd = g_io_channel_unix_get_fd(gio);
433 
434  crm_trace("%p.%d %d", data, fd, condition);
435 
436  /* if this assert get's hit, then there is a race condition between
437  * when we destroy a fd and when mainloop actually gives it up */
438  CRM_ASSERT(adaptor->is_used > 0);
439 
440  return (adaptor->fn(fd, condition, adaptor->data) == 0);
441 }
442 
443 static void
444 gio_poll_destroy(gpointer data)
445 {
446  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
447 
448  adaptor->is_used--;
449  CRM_ASSERT(adaptor->is_used >= 0);
450 
451  if (adaptor->is_used == 0) {
452  crm_trace("Marking adaptor %p unused", adaptor);
453  adaptor->source = 0;
454  }
455 }
456 
457 static int32_t
458 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
459  void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
460 {
461  struct gio_to_qb_poll *adaptor;
462  GIOChannel *channel;
463  int32_t res = 0;
464 
465  res = qb_array_index(gio_map, fd, (void **)&adaptor);
466  if (res < 0) {
467  crm_err("Array lookup failed for fd=%d: %d", fd, res);
468  return res;
469  }
470 
471  crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
472 
473  if (add && adaptor->source) {
474  crm_err("Adaptor for descriptor %d is still in-use", fd);
475  return -EEXIST;
476  }
477  if (!add && !adaptor->is_used) {
478  crm_err("Adaptor for descriptor %d is not in-use", fd);
479  return -ENOENT;
480  }
481 
482  /* channel is created with ref_count = 1 */
483  channel = g_io_channel_unix_new(fd);
484  if (!channel) {
485  crm_err("No memory left to add fd=%d", fd);
486  return -ENOMEM;
487  }
488 
489  if (adaptor->source) {
490  g_source_remove(adaptor->source);
491  adaptor->source = 0;
492  }
493 
494  /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
495  evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
496 
497  adaptor->fn = fn;
498  adaptor->events = evts;
499  adaptor->data = data;
500  adaptor->p = p;
501  adaptor->is_used++;
502  adaptor->source =
503  g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
504  gio_poll_destroy);
505 
506  /* Now that mainloop now holds a reference to channel,
507  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
508  *
509  * This means that channel will be free'd by:
510  * g_main_context_dispatch()
511  * -> g_source_destroy_internal()
512  * -> g_source_callback_unref()
513  * shortly after gio_poll_destroy() completes
514  */
515  g_io_channel_unref(channel);
516 
517  crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
518  if (adaptor->source > 0) {
519  return 0;
520  }
521 
522  return -EINVAL;
523 }
524 
525 static int32_t
526 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
527  void *data, qb_ipcs_dispatch_fn_t fn)
528 {
529  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
530 }
531 
532 static int32_t
533 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
534  void *data, qb_ipcs_dispatch_fn_t fn)
535 {
536  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
537 }
538 
539 static int32_t
540 gio_poll_dispatch_del(int32_t fd)
541 {
542  struct gio_to_qb_poll *adaptor;
543 
544  crm_trace("Looking for fd=%d", fd);
545  if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
546  if (adaptor->source) {
547  g_source_remove(adaptor->source);
548  adaptor->source = 0;
549  }
550  }
551  return 0;
552 }
553 
554 struct qb_ipcs_poll_handlers gio_poll_funcs = {
555  .job_add = NULL,
556  .dispatch_add = gio_poll_dispatch_add,
557  .dispatch_mod = gio_poll_dispatch_mod,
558  .dispatch_del = gio_poll_dispatch_del,
559 };
560 
561 static enum qb_ipc_type
562 pick_ipc_type(enum qb_ipc_type requested)
563 {
564  const char *env = getenv("PCMK_ipc_type");
565 
566  if (env && strcmp("shared-mem", env) == 0) {
567  return QB_IPC_SHM;
568  } else if (env && strcmp("socket", env) == 0) {
569  return QB_IPC_SOCKET;
570  } else if (env && strcmp("posix", env) == 0) {
571  return QB_IPC_POSIX_MQ;
572  } else if (env && strcmp("sysv", env) == 0) {
573  return QB_IPC_SYSV_MQ;
574  } else if (requested == QB_IPC_NATIVE) {
575  /* We prefer shared memory because the server never blocks on
576  * send. If part of a message fits into the socket, libqb
577  * needs to block until the remainder can be sent also.
578  * Otherwise the client will wait forever for the remaining
579  * bytes.
580  */
581  return QB_IPC_SHM;
582  }
583  return requested;
584 }
585 
586 qb_ipcs_service_t *
587 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
588  struct qb_ipcs_service_handlers * callbacks)
589 {
590  int rc = 0;
591  qb_ipcs_service_t *server = NULL;
592 
593  if (gio_map == NULL) {
594  gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
595  }
596 
597  crm_client_init();
598  server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
599 
600 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
601  /* All clients should use at least ipc_buffer_max as their buffer size */
602  qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
603 #endif
604 
605  qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
606 
607  rc = qb_ipcs_run(server);
608  if (rc < 0) {
609  crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
610  return NULL;
611  }
612 
613  return server;
614 }
615 
616 void
617 mainloop_del_ipc_server(qb_ipcs_service_t * server)
618 {
619  if (server) {
620  qb_ipcs_destroy(server);
621  }
622 }
623 
624 struct mainloop_io_s {
625  char *name;
626  void *userdata;
627 
628  int fd;
629  guint source;
630  crm_ipc_t *ipc;
631  GIOChannel *channel;
632 
633  int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
634  int (*dispatch_fn_io) (gpointer userdata);
635  void (*destroy_fn) (gpointer userdata);
636 
637 };
638 
639 static gboolean
640 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
641 {
642  gboolean keep = TRUE;
643  mainloop_io_t *client = data;
644 
645  CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
646 
647  if (condition & G_IO_IN) {
648  if (client->ipc) {
649  long rc = 0;
650  int max = 10;
651 
652  do {
653  rc = crm_ipc_read(client->ipc);
654  if (rc <= 0) {
655  crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
656  client->name, client, pcmk_strerror(rc), rc);
657 
658  } else if (client->dispatch_fn_ipc) {
659  const char *buffer = crm_ipc_buffer(client->ipc);
660 
661  crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
662  if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
663  crm_trace("Connection to %s no longer required", client->name);
664  keep = FALSE;
665  }
666  }
667 
668  } while (keep && rc > 0 && --max > 0);
669 
670  } else {
671  crm_trace("New message from %s[%p] %u", client->name, client, condition);
672  if (client->dispatch_fn_io) {
673  if (client->dispatch_fn_io(client->userdata) < 0) {
674  crm_trace("Connection to %s no longer required", client->name);
675  keep = FALSE;
676  }
677  }
678  }
679  }
680 
681  if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
682  crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
683  keep = FALSE;
684 
685  } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
686  crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
687  client->name, client, condition);
688  keep = FALSE;
689 
690  } else if ((condition & G_IO_IN) == 0) {
691  /*
692  #define GLIB_SYSDEF_POLLIN =1
693  #define GLIB_SYSDEF_POLLPRI =2
694  #define GLIB_SYSDEF_POLLOUT =4
695  #define GLIB_SYSDEF_POLLERR =8
696  #define GLIB_SYSDEF_POLLHUP =16
697  #define GLIB_SYSDEF_POLLNVAL =32
698 
699  typedef enum
700  {
701  G_IO_IN GLIB_SYSDEF_POLLIN,
702  G_IO_OUT GLIB_SYSDEF_POLLOUT,
703  G_IO_PRI GLIB_SYSDEF_POLLPRI,
704  G_IO_ERR GLIB_SYSDEF_POLLERR,
705  G_IO_HUP GLIB_SYSDEF_POLLHUP,
706  G_IO_NVAL GLIB_SYSDEF_POLLNVAL
707  } GIOCondition;
708 
709  A bitwise combination representing a condition to watch for on an event source.
710 
711  G_IO_IN There is data to read.
712  G_IO_OUT Data can be written (without blocking).
713  G_IO_PRI There is urgent data to read.
714  G_IO_ERR Error condition.
715  G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
716  G_IO_NVAL Invalid request. The file descriptor is not open.
717  */
718  crm_err("Strange condition: %d", condition);
719  }
720 
721  /* keep == FALSE results in mainloop_gio_destroy() being called
722  * just before the source is removed from mainloop
723  */
724  return keep;
725 }
726 
727 static void
728 mainloop_gio_destroy(gpointer c)
729 {
730  mainloop_io_t *client = c;
731  char *c_name = strdup(client->name);
732 
733  /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
734  * client->channel will still have ref_count > 0... should be == 1
735  */
736  crm_trace("Destroying client %s[%p]", c_name, c);
737 
738  if (client->ipc) {
739  crm_ipc_close(client->ipc);
740  }
741 
742  if (client->destroy_fn) {
743  void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
744 
745  client->destroy_fn = NULL;
746  destroy_fn(client->userdata);
747  }
748 
749  if (client->ipc) {
750  crm_ipc_t *ipc = client->ipc;
751 
752  client->ipc = NULL;
753  crm_ipc_destroy(ipc);
754  }
755 
756  crm_trace("Destroyed client %s[%p]", c_name, c);
757 
758  free(client->name); client->name = NULL;
759  free(client);
760 
761  free(c_name);
762 }
763 
765 mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
766  struct ipc_client_callbacks *callbacks)
767 {
768  mainloop_io_t *client = NULL;
769  crm_ipc_t *conn = crm_ipc_new(name, max_size);
770 
771  if (conn && crm_ipc_connect(conn)) {
772  int32_t fd = crm_ipc_get_fd(conn);
773 
774  client = mainloop_add_fd(name, priority, fd, userdata, NULL);
775  }
776 
777  if (client == NULL) {
778  crm_perror(LOG_TRACE, "Connection to %s failed", name);
779  if (conn) {
780  crm_ipc_close(conn);
781  crm_ipc_destroy(conn);
782  }
783  return NULL;
784  }
785 
786  client->ipc = conn;
787  client->destroy_fn = callbacks->destroy;
788  client->dispatch_fn_ipc = callbacks->dispatch;
789  return client;
790 }
791 
792 void
794 {
795  mainloop_del_fd(client);
796 }
797 
798 crm_ipc_t *
800 {
801  if (client) {
802  return client->ipc;
803  }
804  return NULL;
805 }
806 
808 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
809  struct mainloop_fd_callbacks * callbacks)
810 {
811  mainloop_io_t *client = NULL;
812 
813  if (fd >= 0) {
814  client = calloc(1, sizeof(mainloop_io_t));
815  if (client == NULL) {
816  return NULL;
817  }
818  client->name = strdup(name);
819  client->userdata = userdata;
820 
821  if (callbacks) {
822  client->destroy_fn = callbacks->destroy;
823  client->dispatch_fn_io = callbacks->dispatch;
824  }
825 
826  client->fd = fd;
827  client->channel = g_io_channel_unix_new(fd);
828  client->source =
829  g_io_add_watch_full(client->channel, priority,
830  (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
831  client, mainloop_gio_destroy);
832 
833  /* Now that mainloop now holds a reference to channel,
834  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
835  *
836  * This means that channel will be free'd by:
837  * g_main_context_dispatch() or g_source_remove()
838  * -> g_source_destroy_internal()
839  * -> g_source_callback_unref()
840  * shortly after mainloop_gio_destroy() completes
841  */
842  g_io_channel_unref(client->channel);
843  crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
844  } else {
845  errno = EINVAL;
846  }
847 
848  return client;
849 }
850 
851 void
853 {
854  if (client != NULL) {
855  crm_trace("Removing client %s[%p]", client->name, client);
856  if (client->source) {
857  /* Results in mainloop_gio_destroy() being called just
858  * before the source is removed from mainloop
859  */
860  g_source_remove(client->source);
861  }
862  }
863 }
864 
865 static GListPtr child_list = NULL;
866 
867 pid_t
869 {
870  return child->pid;
871 }
872 
873 const char *
875 {
876  return child->desc;
877 }
878 
879 int
881 {
882  return child->timeout;
883 }
884 
885 void *
887 {
888  return child->privatedata;
889 }
890 
891 void
893 {
894  child->privatedata = NULL;
895 }
896 
897 /* good function name */
898 static void
899 child_free(mainloop_child_t *child)
900 {
901  if (child->timerid != 0) {
902  crm_trace("Removing timer %d", child->timerid);
903  g_source_remove(child->timerid);
904  child->timerid = 0;
905  }
906  free(child->desc);
907  free(child);
908 }
909 
910 /* terrible function name */
911 static int
912 child_kill_helper(mainloop_child_t *child)
913 {
914  int rc;
915  if (child->flags & mainloop_leave_pid_group) {
916  crm_debug("Kill pid %d only. leave group intact.", child->pid);
917  rc = kill(child->pid, SIGKILL);
918  } else {
919  crm_debug("Kill pid %d's group", child->pid);
920  rc = kill(-child->pid, SIGKILL);
921  }
922 
923  if (rc < 0) {
924  if (errno != ESRCH) {
925  crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
926  }
927  return -errno;
928  }
929  return 0;
930 }
931 
932 static gboolean
933 child_timeout_callback(gpointer p)
934 {
935  mainloop_child_t *child = p;
936  int rc = 0;
937 
938  child->timerid = 0;
939  if (child->timeout) {
940  crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
941  return FALSE;
942  }
943 
944  rc = child_kill_helper(child);
945  if (rc == ESRCH) {
946  /* Nothing left to do. pid doesn't exist */
947  return FALSE;
948  }
949 
950  child->timeout = TRUE;
951  crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
952 
953  child->timerid = g_timeout_add(5000, child_timeout_callback, child);
954  return FALSE;
955 }
956 
957 static gboolean
958 child_waitpid(mainloop_child_t *child, int flags)
959 {
960  int rc = 0;
961  int core = 0;
962  int signo = 0;
963  int status = 0;
964  int exitcode = 0;
965 
966  rc = waitpid(child->pid, &status, flags);
967  if(rc == 0) {
968  crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
969  return FALSE;
970 
971  } else if(rc != child->pid) {
972  signo = SIGCHLD;
973  exitcode = 1;
974  status = 1;
975  crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
976 
977  } else {
978  crm_trace("Managed process %d exited: %p", child->pid, child);
979 
980  if (WIFEXITED(status)) {
981  exitcode = WEXITSTATUS(status);
982  crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
983 
984  } else if (WIFSIGNALED(status)) {
985  signo = WTERMSIG(status);
986  crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
987  }
988 #ifdef WCOREDUMP
989  if (WCOREDUMP(status)) {
990  core = 1;
991  crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
992  }
993 #endif
994  }
995 
996  if (child->callback) {
997  child->callback(child, child->pid, core, signo, exitcode);
998  }
999  return TRUE;
1000 }
1001 
1002 static void
1003 child_death_dispatch(int signal)
1004 {
1005  GListPtr iter = child_list;
1006  gboolean exited;
1007 
1008  while(iter) {
1009  GListPtr saved = NULL;
1010  mainloop_child_t *child = iter->data;
1011  exited = child_waitpid(child, WNOHANG);
1012 
1013  saved = iter;
1014  iter = iter->next;
1015 
1016  if (exited == FALSE) {
1017  continue;
1018  }
1019  crm_trace("Removing process entry %p for %d", child, child->pid);
1020 
1021  child_list = g_list_remove_link(child_list, saved);
1022  g_list_free(saved);
1023  child_free(child);
1024  }
1025 }
1026 
1027 static gboolean
1028 child_signal_init(gpointer p)
1029 {
1030  crm_trace("Installed SIGCHLD handler");
1031  /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1032  mainloop_add_signal(SIGCHLD, child_death_dispatch);
1033 
1034  /* In case they terminated before the signal handler was installed */
1035  child_death_dispatch(SIGCHLD);
1036  return FALSE;
1037 }
1038 
1039 int
1041 {
1042  GListPtr iter;
1043  mainloop_child_t *child = NULL;
1044  mainloop_child_t *match = NULL;
1045  /* It is impossible to block SIGKILL, this allows us to
1046  * call waitpid without WNOHANG flag.*/
1047  int waitflags = 0, rc = 0;
1048 
1049  for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1050  child = iter->data;
1051  if (pid == child->pid) {
1052  match = child;
1053  }
1054  }
1055 
1056  if (match == NULL) {
1057  return FALSE;
1058  }
1059 
1060  rc = child_kill_helper(match);
1061  if(rc == -ESRCH) {
1062  /* Its gone, but hasn't shown up in waitpid() yet
1063  *
1064  * Wait until we get SIGCHLD and let child_death_dispatch()
1065  * clean it up as normal (so we get the correct return
1066  * code/status)
1067  *
1068  * The blocking alternative would be to call:
1069  * child_waitpid(match, 0);
1070  */
1071  crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1072  return TRUE;
1073 
1074  } else if(rc != 0) {
1075  /* If KILL for some other reason set the WNOHANG flag since we
1076  * can't be certain what happened.
1077  */
1078  waitflags = WNOHANG;
1079  }
1080 
1081  if (child_waitpid(match, waitflags) == FALSE) {
1082  /* not much we can do if this occurs */
1083  return FALSE;
1084  }
1085 
1086  child_list = g_list_remove(child_list, match);
1087  child_free(match);
1088  return TRUE;
1089 }
1090 
1091 /* Create/Log a new tracked process
1092  * To track a process group, use -pid
1093  */
1094 void
1095 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1096  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1097 {
1098  static bool need_init = TRUE;
1099  mainloop_child_t *child = g_new(mainloop_child_t, 1);
1100 
1101  child->pid = pid;
1102  child->timerid = 0;
1103  child->timeout = FALSE;
1104  child->privatedata = privatedata;
1105  child->callback = callback;
1106  child->flags = flags;
1107 
1108  if(desc) {
1109  child->desc = strdup(desc);
1110  }
1111 
1112  if (timeout) {
1113  child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1114  }
1115 
1116  child_list = g_list_append(child_list, child);
1117 
1118  if(need_init) {
1119  need_init = FALSE;
1120  /* SIGCHLD processing has to be invoked from mainloop.
1121  * We do not want it to be possible to both add a child pid
1122  * to mainloop, and have the pid's exit callback invoked within
1123  * the same callstack. */
1124  g_timeout_add(1, child_signal_init, NULL);
1125  }
1126 }
1127 
1128 void
1129 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1130  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1131 {
1132  mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1133 }
1134 
1135 struct mainloop_timer_s {
1136  guint id;
1137  guint period_ms;
1138  bool repeat;
1139  char *name;
1140  GSourceFunc cb;
1141  void *userdata;
1142 };
1143 
1144 struct mainloop_timer_s mainloop;
1145 
1146 static gboolean mainloop_timer_cb(gpointer user_data)
1147 {
1148  int id = 0;
1149  bool repeat = FALSE;
1150  struct mainloop_timer_s *t = user_data;
1151 
1152  CRM_ASSERT(t != NULL);
1153 
1154  id = t->id;
1155  t->id = 0; /* Ensure it's unset during callbacks so that
1156  * mainloop_timer_running() works as expected
1157  */
1158 
1159  if(t->cb) {
1160  crm_trace("Invoking callbacks for timer %s", t->name);
1161  repeat = t->repeat;
1162  if(t->cb(t->userdata) == FALSE) {
1163  crm_trace("Timer %s complete", t->name);
1164  repeat = FALSE;
1165  }
1166  }
1167 
1168  if(repeat) {
1169  /* Restore if repeating */
1170  t->id = id;
1171  }
1172 
1173  return repeat;
1174 }
1175 
1177 {
1178  if(t && t->id != 0) {
1179  return TRUE;
1180  }
1181  return FALSE;
1182 }
1183 
1185 {
1187  if(t && t->period_ms > 0) {
1188  crm_trace("Starting timer %s", t->name);
1189  t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1190  }
1191 }
1192 
1194 {
1195  if(t && t->id != 0) {
1196  crm_trace("Stopping timer %s", t->name);
1197  g_source_remove(t->id);
1198  t->id = 0;
1199  }
1200 }
1201 
1203 {
1204  guint last = 0;
1205 
1206  if(t) {
1207  last = t->period_ms;
1208  t->period_ms = period_ms;
1209  }
1210 
1211  if(t && t->id != 0 && last != t->period_ms) {
1213  }
1214  return last;
1215 }
1216 
1217 
1219 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1220 {
1221  mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1222 
1223  if(t) {
1224  if(name) {
1225  t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1226  } else {
1227  t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1228  }
1229  t->id = 0;
1230  t->period_ms = period_ms;
1231  t->repeat = repeat;
1232  t->cb = cb;
1233  t->userdata = userdata;
1234  crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1235  }
1236  return t;
1237 }
1238 
1239 void
1241 {
1242  if(t) {
1243  crm_trace("Destroying timer %s", t->name);
1245  free(t->name);
1246  free(t);
1247  }
1248 }
1249 
void * mainloop_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:886
#define LOG_TRACE
Definition: logging.h:29
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition: mainloop.c:868
bool mainloop_timer_running(mainloop_timer_t *t)
Definition: mainloop.c:1176
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
Definition: ipc.c:798
A dumping ground.
#define crm_notice(fmt, args...)
Definition: logging.h:250
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
Definition: logging.h:247
void mainloop_del_fd(mainloop_io_t *client)
Definition: mainloop.c:852
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition: mainloop.c:617
mainloop_child_flags
Definition: mainloop.h:29
gboolean mainloop_destroy_signal(int sig)
Definition: mainloop.c:383
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Definition: mainloop.c:211
int crm_ipc_get_fd(crm_ipc_t *client)
Definition: ipc.c:867
const char * pcmk_strerror(int rc)
Definition: logging.c:1113
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition: mainloop.c:199
const char * mainloop_child_name(mainloop_child_t *child)
Definition: mainloop.c:874
struct mainloop_timer_s mainloop_timer_t
Definition: mainloop.h:37
struct mainloop_io_s mainloop_io_t
Definition: mainloop.h:35
struct mainloop_child_s mainloop_child_t
Definition: mainloop.h:36
gboolean crm_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:300
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:90
long crm_ipc_read(crm_ipc_t *client)
Definition: ipc.c:965
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition: mainloop.c:1219
uint32_t pid
Definition: internal.h:49
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition: mainloop.c:765
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition: mainloop.c:554
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition: mainloop.c:1202
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
Definition: ipc.c:249
void mainloop_timer_del(mainloop_timer_t *t)
Definition: mainloop.c:1240
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:799
const char * crm_ipc_buffer(crm_ipc_t *client)
Definition: ipc.c:1011
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:793
struct trigger_s crm_trigger_t
Definition: mainloop.h:34
void(* destroy)(gpointer)
Definition: mainloop.h:74
#define crm_warn(fmt, args...)
Definition: logging.h:249
int mainloop_child_timeout(mainloop_child_t *child)
Definition: mainloop.c:880
uint32_t id
Definition: internal.h:48
#define crm_debug(fmt, args...)
Definition: logging.h:253
struct crm_ipc_s crm_ipc_t
Definition: ipc.h:61
void(* destroy)(gpointer userdata)
Definition: mainloop.h:91
void mainloop_set_trigger(crm_trigger_t *source)
Definition: mainloop.c:223
#define crm_trace(fmt, args...)
Definition: logging.h:254
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:808
void mainloop_timer_start(mainloop_timer_t *t)
Definition: mainloop.c:1184
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition: mainloop.c:587
Wrappers for and extensions to libxml2.
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:892
void mainloop_timer_stop(mainloop_timer_t *t)
Definition: mainloop.c:1193
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1095
unsigned int crm_ipc_default_buffer_size(void)
Definition: ipc.c:64
void crm_ipc_destroy(crm_ipc_t *client)
Definition: ipc.c:844
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1129
bool crm_ipc_connected(crm_ipc_t *client)
Definition: ipc.c:881
#define CRM_XS
Definition: logging.h:42
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
#define crm_err(fmt, args...)
Definition: logging.h:248
struct mainloop_timer_s mainloop
Definition: mainloop.c:1144
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Definition: ipc.c:770
#define CRM_ASSERT(expr)
Definition: error.h:35
char data[0]
Definition: internal.h:58
void mainloop_cleanup(void)
Definition: mainloop.c:409
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:325
int mainloop_child_kill(pid_t pid)
Definition: mainloop.c:1040
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition: mainloop.c:231
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
void crm_ipc_close(crm_ipc_t *client)
Definition: ipc.c:829
GList * GListPtr
Definition: crm.h:192
#define crm_info(fmt, args...)
Definition: logging.h:251
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Definition: mainloop.h:73
uint64_t flags
Definition: remote.c:121
enum crm_ais_msg_types type
Definition: internal.h:51
#define int32_t
Definition: stdint.in.h:157