36 struct mainloop_child_s {
60 crm_trigger_prepare(GSource * source, gint * timeout)
87 crm_trigger_check(GSource * source)
95 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
104 trig->trigger = FALSE;
107 rc = callback(trig->user_data);
109 crm_trace(
"Trigger handler %p not yet complete", trig);
110 trig->running = TRUE;
118 crm_trigger_finalize(GSource * source)
120 crm_trace(
"Trigger %p destroyed", source);
126 gpointer callback_data;
127 GSourceCallbackFuncs *callback_funcs;
129 const GSourceFuncs *source_funcs;
132 GMainContext *context;
149 g_source_refcount(GSource * source)
153 struct _GSourceCopy *evil = (
struct _GSourceCopy*)source;
154 return evil->ref_count;
159 static int g_source_refcount(GSource * source)
165 static GSourceFuncs crm_trigger_funcs = {
168 crm_trigger_dispatch,
169 crm_trigger_finalize,
173 mainloop_setup_trigger(GSource * source,
int priority,
int (*dispatch) (gpointer user_data),
181 trigger->trigger = FALSE;
182 trigger->user_data = userdata;
185 g_source_set_callback(source, dispatch, trigger, NULL);
188 g_source_set_priority(source, priority);
189 g_source_set_can_recurse(source, FALSE);
191 crm_trace(
"Setup %p with ref-count=%u", source, g_source_refcount(source));
192 trigger->id = g_source_attach(source, NULL);
193 crm_trace(
"Attached %p with ref-count=%u", source, g_source_refcount(source));
201 crm_trace(
"Trigger handler %p complete", trig);
202 trig->running = FALSE;
213 GSource *source = NULL;
216 source = g_source_new(&crm_trigger_funcs,
sizeof(
crm_trigger_t));
219 return mainloop_setup_trigger(source, priority, dispatch, userdata);
226 source->trigger = TRUE;
239 gs = (GSource *)source;
241 if(g_source_refcount(gs) > 2) {
242 crm_info(
"Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
245 g_source_destroy(gs);
257 typedef struct signal_s {
259 void (*handler) (
int sig);
267 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
271 if(sig->signal != SIGCHLD) {
273 strsignal(sig->signal), sig->signal,
274 (sig->handler?
"invoking" :
"no"));
277 sig->trigger.trigger = FALSE;
279 sig->handler(sig->signal);
285 mainloop_signal_handler(
int sig)
287 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
292 static GSourceFuncs crm_signal_funcs = {
296 crm_trigger_finalize,
304 struct sigaction old;
306 if (sigemptyset(&mask) < 0) {
307 crm_perror(LOG_ERR,
"Call to sigemptyset failed");
311 memset(&sa, 0,
sizeof(
struct sigaction));
312 sa.sa_handler = dispatch;
313 sa.sa_flags = SA_RESTART;
316 if (sigaction(sig, &sa, &old) < 0) {
317 crm_perror(LOG_ERR,
"Could not install signal handler for signal %d", sig);
327 GSource *source = NULL;
328 int priority = G_PRIORITY_HIGH - 1;
330 if (sig == SIGTERM) {
338 if (sig >= NSIG || sig < 0) {
339 crm_err(
"Signal %d is out of range", sig);
342 }
else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
343 crm_trace(
"Signal handler for %d is already installed", sig);
346 }
else if (crm_signals[sig] != NULL) {
347 crm_err(
"Different signal handler for %d is already installed", sig);
352 source = g_source_new(&crm_signal_funcs,
sizeof(
crm_signal_t));
354 crm_signals[sig] = (
crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
357 crm_signals[sig]->handler = dispatch;
358 crm_signals[sig]->signal = sig;
360 if (
crm_signal(sig, mainloop_signal_handler) == FALSE) {
363 crm_signals[sig] = NULL;
374 if (siginterrupt(sig, 1) < 0) {
375 crm_perror(LOG_INFO,
"Could not enable system call interruptions for signal %d", sig);
387 if (sig >= NSIG || sig < 0) {
388 crm_err(
"Signal %d is out of range", sig);
392 crm_perror(LOG_ERR,
"Could not uninstall signal handler for signal %d", sig);
395 }
else if (crm_signals[sig] == NULL) {
400 tmp = crm_signals[sig];
401 crm_signals[sig] = NULL;
406 static qb_array_t *gio_map = NULL;
412 qb_array_free(gio_map);
419 struct gio_to_qb_poll {
424 qb_ipcs_dispatch_fn_t fn;
425 enum qb_loop_priority p;
429 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer
data)
431 struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)data;
432 gint fd = g_io_channel_unix_get_fd(gio);
434 crm_trace(
"%p.%d %d", data, fd, condition);
440 return (adaptor->fn(fd, condition, adaptor->data) == 0);
444 gio_poll_destroy(gpointer data)
446 struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)data;
451 if (adaptor->is_used == 0) {
452 crm_trace(
"Marking adaptor %p unused", adaptor);
458 gio_poll_dispatch_update(
enum qb_loop_priority p,
int32_t fd,
int32_t evts,
459 void *data, qb_ipcs_dispatch_fn_t fn,
int32_t add)
461 struct gio_to_qb_poll *adaptor;
465 res = qb_array_index(gio_map, fd, (
void **)&adaptor);
467 crm_err(
"Array lookup failed for fd=%d: %d", fd, res);
471 crm_trace(
"Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
473 if (add && adaptor->source) {
474 crm_err(
"Adaptor for descriptor %d is still in-use", fd);
477 if (!add && !adaptor->is_used) {
478 crm_err(
"Adaptor for descriptor %d is not in-use", fd);
483 channel = g_io_channel_unix_new(fd);
485 crm_err(
"No memory left to add fd=%d", fd);
489 if (adaptor->source) {
490 g_source_remove(adaptor->source);
495 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
498 adaptor->events = evts;
499 adaptor->data =
data;
503 g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
515 g_io_channel_unref(channel);
517 crm_trace(
"Added to mainloop with gsource id=%d", adaptor->source);
518 if (adaptor->source > 0) {
526 gio_poll_dispatch_add(
enum qb_loop_priority p,
int32_t fd,
int32_t evts,
527 void *data, qb_ipcs_dispatch_fn_t fn)
529 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
533 gio_poll_dispatch_mod(
enum qb_loop_priority p,
int32_t fd,
int32_t evts,
534 void *data, qb_ipcs_dispatch_fn_t fn)
536 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
540 gio_poll_dispatch_del(
int32_t fd)
542 struct gio_to_qb_poll *adaptor;
545 if (qb_array_index(gio_map, fd, (
void **)&adaptor) == 0) {
546 if (adaptor->source) {
547 g_source_remove(adaptor->source);
556 .dispatch_add = gio_poll_dispatch_add,
557 .dispatch_mod = gio_poll_dispatch_mod,
558 .dispatch_del = gio_poll_dispatch_del,
561 static enum qb_ipc_type
562 pick_ipc_type(
enum qb_ipc_type requested)
564 const char *env = getenv(
"PCMK_ipc_type");
566 if (env && strcmp(
"shared-mem", env) == 0) {
568 }
else if (env && strcmp(
"socket", env) == 0) {
569 return QB_IPC_SOCKET;
570 }
else if (env && strcmp(
"posix", env) == 0) {
571 return QB_IPC_POSIX_MQ;
572 }
else if (env && strcmp(
"sysv", env) == 0) {
573 return QB_IPC_SYSV_MQ;
574 }
else if (requested == QB_IPC_NATIVE) {
588 struct qb_ipcs_service_handlers * callbacks)
591 qb_ipcs_service_t *server = NULL;
593 if (gio_map == NULL) {
594 gio_map = qb_array_create_2(64,
sizeof(
struct gio_to_qb_poll), 1);
598 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
600 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
605 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
607 rc = qb_ipcs_run(server);
620 qb_ipcs_destroy(server);
624 struct mainloop_io_s {
633 int (*dispatch_fn_ipc) (
const char *buffer, ssize_t length, gpointer userdata);
634 int (*dispatch_fn_io) (gpointer userdata);
635 void (*destroy_fn) (gpointer userdata);
640 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
642 gboolean keep = TRUE;
645 CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
647 if (condition & G_IO_IN) {
655 crm_trace(
"Message acquisition from %s[%p] failed: %s (%ld)",
658 }
else if (client->dispatch_fn_ipc) {
661 crm_trace(
"New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
662 if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
663 crm_trace(
"Connection to %s no longer required", client->name);
668 }
while (keep && rc > 0 && --max > 0);
671 crm_trace(
"New message from %s[%p] %u", client->name, client, condition);
672 if (client->dispatch_fn_io) {
673 if (client->dispatch_fn_io(client->userdata) < 0) {
674 crm_trace(
"Connection to %s no longer required", client->name);
682 crm_err(
"Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
685 }
else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
686 crm_trace(
"The connection %s[%p] has been closed (I/O condition=%d)",
687 client->name, client, condition);
690 }
else if ((condition & G_IO_IN) == 0) {
718 crm_err(
"Strange condition: %d", condition);
728 mainloop_gio_destroy(gpointer c)
731 char *c_name = strdup(client->name);
736 crm_trace(
"Destroying client %s[%p]", c_name, c);
742 if (client->destroy_fn) {
743 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
745 client->destroy_fn = NULL;
746 destroy_fn(client->userdata);
756 crm_trace(
"Destroyed client %s[%p]", c_name, c);
758 free(client->name); client->name = NULL;
777 if (client == NULL) {
787 client->destroy_fn = callbacks->
destroy;
788 client->dispatch_fn_ipc = callbacks->
dispatch;
815 if (client == NULL) {
818 client->name = strdup(name);
819 client->userdata = userdata;
822 client->destroy_fn = callbacks->
destroy;
823 client->dispatch_fn_io = callbacks->
dispatch;
827 client->channel = g_io_channel_unix_new(fd);
829 g_io_add_watch_full(client->channel, priority,
830 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
831 client, mainloop_gio_destroy);
842 g_io_channel_unref(client->channel);
843 crm_trace(
"Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
854 if (client != NULL) {
855 crm_trace(
"Removing client %s[%p]", client->name, client);
856 if (client->source) {
860 g_source_remove(client->source);
882 return child->timeout;
888 return child->privatedata;
894 child->privatedata = NULL;
901 if (child->timerid != 0) {
902 crm_trace(
"Removing timer %d", child->timerid);
903 g_source_remove(child->timerid);
916 crm_debug(
"Kill pid %d only. leave group intact.", child->pid);
917 rc = kill(child->pid, SIGKILL);
919 crm_debug(
"Kill pid %d's group", child->pid);
920 rc = kill(-child->pid, SIGKILL);
924 if (errno != ESRCH) {
925 crm_perror(LOG_ERR,
"kill(%d, KILL) failed", child->pid);
933 child_timeout_callback(gpointer p)
939 if (child->timeout) {
940 crm_crit(
"%s process (PID %d) will not die!", child->desc, (
int)child->pid);
944 rc = child_kill_helper(child);
950 child->timeout = TRUE;
951 crm_warn(
"%s process (PID %d) timed out", child->desc, (
int)child->pid);
953 child->timerid = g_timeout_add(5000, child_timeout_callback, child);
966 rc = waitpid(child->pid, &status, flags);
968 crm_perror(LOG_DEBUG,
"wait(%d) = %d", child->pid, rc);
971 }
else if(rc != child->pid) {
975 crm_perror(LOG_ERR,
"Call to waitpid(%d) failed", child->pid);
978 crm_trace(
"Managed process %d exited: %p", child->pid, child);
980 if (WIFEXITED(status)) {
981 exitcode = WEXITSTATUS(status);
982 crm_trace(
"Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
984 }
else if (WIFSIGNALED(status)) {
985 signo = WTERMSIG(status);
986 crm_trace(
"Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
989 if (WCOREDUMP(status)) {
991 crm_err(
"Managed process %d (%s) dumped core", child->pid, child->desc);
996 if (child->callback) {
997 child->callback(child, child->pid, core, signo, exitcode);
1003 child_death_dispatch(
int signal)
1011 exited = child_waitpid(child, WNOHANG);
1016 if (exited == FALSE) {
1019 crm_trace(
"Removing process entry %p for %d", child, child->pid);
1021 child_list = g_list_remove_link(child_list, saved);
1028 child_signal_init(gpointer p)
1035 child_death_dispatch(SIGCHLD);
1047 int waitflags = 0, rc = 0;
1049 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1051 if (pid == child->pid) {
1056 if (match == NULL) {
1060 rc = child_kill_helper(match);
1071 crm_trace(
"Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1074 }
else if(rc != 0) {
1078 waitflags = WNOHANG;
1081 if (child_waitpid(match, waitflags) == FALSE) {
1086 child_list = g_list_remove(child_list, match);
1096 void (*callback) (
mainloop_child_t * p, pid_t pid,
int core,
int signo,
int exitcode))
1098 static bool need_init = TRUE;
1103 child->timeout = FALSE;
1104 child->privatedata = privatedata;
1105 child->callback = callback;
1106 child->flags =
flags;
1109 child->desc = strdup(desc);
1113 child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1116 child_list = g_list_append(child_list, child);
1124 g_timeout_add(1, child_signal_init, NULL);
1130 void (*callback) (
mainloop_child_t * p, pid_t pid,
int core,
int signo,
int exitcode))
1135 struct mainloop_timer_s {
1146 static gboolean mainloop_timer_cb(gpointer user_data)
1149 bool repeat = FALSE;
1150 struct mainloop_timer_s *t = user_data;
1160 crm_trace(
"Invoking callbacks for timer %s", t->name);
1162 if(t->cb(t->userdata) == FALSE) {
1163 crm_trace(
"Timer %s complete", t->name);
1178 if(t && t->id != 0) {
1187 if(t && t->period_ms > 0) {
1188 crm_trace(
"Starting timer %s", t->name);
1189 t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1195 if(t && t->id != 0) {
1196 crm_trace(
"Stopping timer %s", t->name);
1197 g_source_remove(t->id);
1207 last = t->period_ms;
1208 t->period_ms = period_ms;
1211 if(t && t->id != 0 && last != t->period_ms) {
1230 t->period_ms = period_ms;
1233 t->userdata = userdata;
1234 crm_trace(
"Created timer %s with %p %p", t->name, userdata, t->userdata);
1243 crm_trace(
"Destroying timer %s", t->name);
void * mainloop_child_userdata(mainloop_child_t *child)
pid_t mainloop_child_pid(mainloop_child_t *child)
bool mainloop_timer_running(mainloop_timer_t *t)
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
#define crm_notice(fmt, args...)
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
void mainloop_del_fd(mainloop_io_t *client)
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
gboolean mainloop_destroy_signal(int sig)
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
int crm_ipc_get_fd(crm_ipc_t *client)
const char * pcmk_strerror(int rc)
void mainloop_trigger_complete(crm_trigger_t *trig)
const char * mainloop_child_name(mainloop_child_t *child)
struct mainloop_timer_s mainloop_timer_t
struct mainloop_io_s mainloop_io_t
struct mainloop_child_s mainloop_child_t
gboolean crm_signal(int sig, void(*dispatch)(int sig))
int(* dispatch)(gpointer userdata)
long crm_ipc_read(crm_ipc_t *client)
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
struct qb_ipcs_poll_handlers gio_poll_funcs
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
void mainloop_timer_del(mainloop_timer_t *t)
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
const char * crm_ipc_buffer(crm_ipc_t *client)
void mainloop_del_ipc_client(mainloop_io_t *client)
struct trigger_s crm_trigger_t
void(* destroy)(gpointer)
#define crm_warn(fmt, args...)
int mainloop_child_timeout(mainloop_child_t *child)
#define crm_debug(fmt, args...)
struct crm_ipc_s crm_ipc_t
void(* destroy)(gpointer userdata)
void mainloop_set_trigger(crm_trigger_t *source)
#define crm_trace(fmt, args...)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
void mainloop_timer_start(mainloop_timer_t *t)
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Wrappers for and extensions to libxml2.
void mainloop_clear_child_userdata(mainloop_child_t *child)
void mainloop_timer_stop(mainloop_timer_t *t)
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
unsigned int crm_ipc_default_buffer_size(void)
void crm_ipc_destroy(crm_ipc_t *client)
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
bool crm_ipc_connected(crm_ipc_t *client)
#define crm_perror(level, fmt, args...)
Log a system error message.
#define crm_err(fmt, args...)
struct mainloop_timer_s mainloop
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
void mainloop_cleanup(void)
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
int mainloop_child_kill(pid_t pid)
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
void crm_ipc_close(crm_ipc_t *client)
#define crm_info(fmt, args...)
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
enum crm_ais_msg_types type