Skip to content

Commit

Permalink
Remove the event base param to prte_wait_cb
Browse files Browse the repository at this point in the history
Reduce potential for mistakes by locking the
wait callbacks to occur in the main event thread.

Signed-off-by: Ralph Castain <rhc@pmix.org>
  • Loading branch information
rhc54 committed Nov 19, 2023
1 parent 066b560 commit ad63c0f
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 19 deletions.
3 changes: 1 addition & 2 deletions src/mca/errmgr/prted/errmgr_prted.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ static void proc_errors(int fd, short args, void *cbdata)
t2 = PMIX_NEW(prte_wait_tracker_t);
PMIX_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = prte_event_base;
prte_event_set(t2->evb, &t2->ev, -1, PRTE_EV_WRITE,
prte_event_set(prte_event_base, &t2->ev, -1, PRTE_EV_WRITE,
prte_odls_base_default_wait_local_proc, t2);
prte_event_active(&t2->ev, PRTE_EV_WRITE, 1);
goto cleanup;
Expand Down
4 changes: 2 additions & 2 deletions src/mca/odls/base/odls_base_default_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,7 @@ void prte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
/* set the waitpid callback here for thread protection and
* to ensure we can capture the callback on shortlived apps */
PRTE_FLAG_SET(child, PRTE_PROC_FLAG_ALIVE);
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, prte_event_base, NULL);
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, NULL);

/* dispatch this child to the next available launch thread */
cd = PMIX_NEW(prte_odls_spawn_caddy_t);
Expand Down Expand Up @@ -2053,7 +2053,7 @@ int prte_odls_base_default_restart_proc(prte_proc_t *child,
prte_odls_globals.next_base = 0;
}
evb = prte_odls_globals.ev_bases[prte_odls_globals.next_base];
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, prte_event_base, NULL);
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, NULL);

PMIX_OUTPUT_VERBOSE((5, prte_odls_base_framework.framework_output, "%s restarting app %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), app->app));
Expand Down
2 changes: 1 addition & 1 deletion src/mca/plm/alps/plm_alps_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ static int plm_alps_start_proc(int argc, char **argv, char **env, char *prefix)
/* be sure to mark it as alive so we don't instantly fire */
PRTE_FLAG_SET(alpsrun, PRTE_PROC_FLAG_ALIVE);
/* setup the waitpid so we can find out if alps succeeds! */
prte_wait_cb(alpsrun, alps_wait_cb, prte_event_base, NULL);
prte_wait_cb(alpsrun, alps_wait_cb, NULL);

if (0 == alps_pid) { /* child */
char *bin_base = NULL, *lib_base = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/mca/plm/pals/plm_pals_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ static int plm_pals_start_proc(int argc, char **argv, char **env, char *prefix)
/* be sure to mark it as alive so we don't instantly fire */
PRTE_FLAG_SET(palsrun, PRTE_PROC_FLAG_ALIVE);
/* setup the waitpid so we can find out if pals succeeds! */
prte_wait_cb(palsrun, pals_wait_cb, prte_event_base, NULL);
prte_wait_cb(palsrun, pals_wait_cb, NULL);

if (0 == pals_pid) { /* child */
char *bin_base = NULL, *lib_base = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/mca/plm/slurm/plm_slurm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ static int plm_slurm_start_proc(int argc, char **argv, char *prefix)
/* be sure to mark it as alive so we don't instantly fire */
PRTE_FLAG_SET(dummy, PRTE_PROC_FLAG_ALIVE);
/* setup the waitpid so we can find out if srun succeeds! */
prte_wait_cb(dummy, srun_wait_cb, prte_event_base, NULL);
prte_wait_cb(dummy, srun_wait_cb, NULL);

if (0 == srun_pid) { /* child */
char *bin_base = NULL, *lib_base = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/mca/plm/ssh/plm_ssh_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ static void process_launch_list(int fd, short args, void *cbdata)
caddy = (prte_plm_ssh_caddy_t *) item;
/* register the sigchild callback */
PRTE_FLAG_SET(caddy->daemon, PRTE_PROC_FLAG_ALIVE);
prte_wait_cb(caddy->daemon, ssh_wait_daemon, prte_event_base, (void *) caddy);
prte_wait_cb(caddy->daemon, ssh_wait_daemon, (void *) caddy);

/* fork a child to exec the ssh/ssh session */
pid = fork();
Expand Down
18 changes: 10 additions & 8 deletions src/runtime/prte_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ static void timer_dest(prte_timer_t *tm)
{
prte_event_free(tm->ev);
}
PMIX_CLASS_INSTANCE(prte_timer_t, pmix_object_t, timer_const, timer_dest);
PMIX_CLASS_INSTANCE(prte_timer_t, pmix_object_t,
timer_const, timer_dest);

static void wccon(prte_wait_tracker_t *p)
{
Expand All @@ -89,7 +90,8 @@ static void wcdes(prte_wait_tracker_t *p)
PMIX_RELEASE(p->child);
}
}
PMIX_CLASS_INSTANCE(prte_wait_tracker_t, pmix_list_item_t, wccon, wcdes);
PMIX_CLASS_INSTANCE(prte_wait_tracker_t, pmix_list_item_t,
wccon, wcdes);

/* Local Variables */
static prte_event_t handler;
Expand All @@ -114,7 +116,8 @@ int prte_wait_init(void)
{
PMIX_CONSTRUCT(&pending_cbs, pmix_list_t);

prte_event_set(prte_event_base, &handler, SIGCHLD, PRTE_EV_SIGNAL | PRTE_EV_PERSIST,
prte_event_set(prte_event_base, &handler, SIGCHLD,
PRTE_EV_SIGNAL | PRTE_EV_PERSIST,
wait_signal_callback, &handler);

prte_event_add(&handler, NULL);
Expand All @@ -133,7 +136,8 @@ int prte_wait_finalize(void)

/* this function *must* always be called from
* within an event in the prte_event_base */
void prte_wait_cb(prte_proc_t *child, prte_wait_cbfunc_t callback, prte_event_base_t *evb,
void prte_wait_cb(prte_proc_t *child,
prte_wait_cbfunc_t callback,
void *data)
{
prte_wait_tracker_t *t2;
Expand All @@ -151,10 +155,9 @@ void prte_wait_cb(prte_proc_t *child, prte_wait_cbfunc_t callback, prte_event_ba
t2 = PMIX_NEW(prte_wait_tracker_t);
PMIX_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = evb;
t2->cbfunc = callback;
t2->cbdata = data;
prte_event_set(t2->evb, &t2->ev, -1, PRTE_EV_WRITE, t2->cbfunc, t2);
prte_event_set(prte_event_base, &t2->ev, -1, PRTE_EV_WRITE, t2->cbfunc, t2);
prte_event_active(&t2->ev, PRTE_EV_WRITE, 1);
}
return;
Expand All @@ -173,7 +176,6 @@ void prte_wait_cb(prte_proc_t *child, prte_wait_cbfunc_t callback, prte_event_ba
t2 = PMIX_NEW(prte_wait_tracker_t);
PMIX_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = evb;
t2->cbfunc = callback;
t2->cbdata = data;
pmix_list_append(&pending_cbs, &t2->super);
Expand Down Expand Up @@ -254,7 +256,7 @@ static void wait_signal_callback(int fd, short event, void *arg)
t2->child->exit_code = status;
pmix_list_remove_item(&pending_cbs, &t2->super);
if (NULL != t2->cbfunc) {
prte_event_set(t2->evb, &t2->ev, -1, PRTE_EV_WRITE, t2->cbfunc, t2);
prte_event_set(prte_event_base, &t2->ev, -1, PRTE_EV_WRITE, t2->cbfunc, t2);
prte_event_active(&t2->ev, PRTE_EV_WRITE, 1);
} else {
PMIX_RELEASE(t2);
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/prte_wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ typedef void (*prte_wait_cbfunc_t)(int fd, short args, void *cb);
typedef struct {
pmix_list_item_t super;
prte_event_t ev;
prte_event_base_t *evb;
prte_proc_t *child;
prte_wait_cbfunc_t cbfunc;
void *cbdata;
Expand All @@ -86,8 +85,9 @@ PRTE_EXPORT void prte_wait_disable(void);
* \c waitpid() will have already been called on the process at this
* time.
*/
PRTE_EXPORT void prte_wait_cb(prte_proc_t *proc, prte_wait_cbfunc_t callback,
prte_event_base_t *evb, void *data);
PRTE_EXPORT void prte_wait_cb(prte_proc_t *proc,
prte_wait_cbfunc_t callback,
void *data);

PRTE_EXPORT void prte_wait_cb_cancel(prte_proc_t *proc);

Expand Down

0 comments on commit ad63c0f

Please sign in to comment.