From 8450055f92bb6d16fdf0e45046b86c63b8fe1df9 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 10 Mar 2022 09:07:33 -0800 Subject: [PATCH] Ensure that stdin goes to all specified targets Currently support only rank=0, rank=all, or none. Signed-off-by: Ralph Castain --- src/mca/iof/hnp/iof_hnp.c | 134 +++++++++++++++---------------- src/prted/pmix/pmix_server_gen.c | 12 ++- src/tools/prte/prte.c | 38 ++++++--- src/tools/prun/prun.c | 38 ++++++--- 4 files changed, 125 insertions(+), 97 deletions(-) diff --git a/src/mca/iof/hnp/iof_hnp.c b/src/mca/iof/hnp/iof_hnp.c index ec8277e397..764ac8dade 100644 --- a/src/mca/iof/hnp/iof_hnp.c +++ b/src/mca/iof/hnp/iof_hnp.c @@ -84,14 +84,16 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz); * which operates independently and is in the iof_hnp_receive.c file */ -prte_iof_base_module_t prte_iof_hnp_module = {.init = init, - .push = hnp_push, - .pull = hnp_pull, - .close = hnp_close, - .output = hnp_output, - .complete = hnp_complete, - .finalize = finalize, - .push_stdin = push_stdin}; +prte_iof_base_module_t prte_iof_hnp_module = { + .init = init, + .push = hnp_push, + .pull = hnp_pull, + .close = hnp_close, + .output = hnp_output, + .complete = hnp_complete, + .finalize = finalize, + .push_stdin = push_stdin +}; /* Initialize the module */ static int init(void) @@ -192,7 +194,7 @@ static int hnp_push(const pmix_proc_t *dst_name, prte_iof_tag_t src_tag, int fd) */ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz) { - prte_iof_proc_t *proct, *pptr; + prte_iof_proc_t *proct; int rc; /* don't do this if the dst vpid is invalid */ @@ -205,66 +207,59 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz) PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), PRTE_NAME_PRINT(dst_name), sz)); /* do we already have this process in our list? */ - proct = NULL; - PMIX_LIST_FOREACH(pptr, &prte_iof_hnp_component.procs, prte_iof_proc_t) + PMIX_LIST_FOREACH(proct, &prte_iof_hnp_component.procs, prte_iof_proc_t) { - if (PMIX_CHECK_PROCID(&pptr->name, dst_name)) { - /* found it */ - proct = pptr; - } - } - if (NULL == proct) { - return PRTE_ERR_NOT_FOUND; - } - - /* did they direct that the data go to this proc? */ - if (NULL == proct->stdinev) { - /* nope - ignore it */ - return PRTE_SUCCESS; - } - - /* if the daemon is me, then this is a local sink */ - if (PMIX_CHECK_PROCID(PRTE_PROC_MY_NAME, &proct->stdinev->daemon)) { - PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, - "%s read %d bytes from stdin - writing to %s", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz, - PRTE_NAME_PRINT(&proct->name))); - /* send the bytes down the pipe - we even send 0 byte events - * down the pipe so it forces out any preceding data before - * closing the output stream - */ - if (NULL != proct->stdinev->wev) { - if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name, - PRTE_IOF_STDIN, data, sz, - proct->stdinev->wev)) { - /* getting too backed up - stop the read event for now if it is still active */ - - PRTE_OUTPUT_VERBOSE( - (1, prte_iof_base_framework.framework_output, "buffer backed up - holding")); - return PRTE_ERR_OUT_OF_RESOURCE; + if (PMIX_CHECK_PROCID(&proct->name, dst_name)) { + /* did they direct that the data go to this proc? */ + if (NULL == proct->stdinev) { + /* nope - ignore it */ + continue; } - } - } else { - PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, - "%s sending %d bytes from stdinev to daemon %s", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz, - PRTE_NAME_PRINT(&proct->stdinev->daemon))); - - /* send the data to the daemon so it can - * write it to the proc's fd - in this case, - * we pass sink->name to indicate who is to - * receive the data. If the connection closed, - * numbytes will be zero so zero bytes will be - * sent - this will tell the daemon to close - * the fd for stdin to that proc - */ - if (PRTE_SUCCESS - != (rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, - &proct->stdinev->name, PRTE_IOF_STDIN, data, - sz))) { - /* if the addressee is unknown, remove the sink from the list */ - if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) { - PMIX_RELEASE(proct->stdinev); + + /* if the daemon is me, then this is a local sink */ + if (PMIX_CHECK_PROCID(PRTE_PROC_MY_NAME, &proct->stdinev->daemon)) { + PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, + "%s read %d bytes from stdin - writing to %s", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz, + PRTE_NAME_PRINT(&proct->name))); + /* send the bytes down the pipe - we even send 0 byte events + * down the pipe so it forces out any preceding data before + * closing the output stream + */ + if (NULL != proct->stdinev->wev) { + if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name, + PRTE_IOF_STDIN, data, sz, + proct->stdinev->wev)) { + /* getting too backed up - stop the read event for now if it is still active */ + + PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, + "buffer backed up - holding")); + return PRTE_ERR_OUT_OF_RESOURCE; + } + } + } else { + PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, + "%s sending %d bytes from stdinev to daemon %s", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz, + PRTE_NAME_PRINT(&proct->stdinev->daemon))); + + /* send the data to the daemon so it can + * write it to the proc's fd - in this case, + * we pass sink->name to indicate who is to + * receive the data. If the connection closed, + * numbytes will be zero so zero bytes will be + * sent - this will tell the daemon to close + * the fd for stdin to that proc + */ + if (PRTE_SUCCESS + != (rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, + &proct->stdinev->name, PRTE_IOF_STDIN, data, + sz))) { + /* if the addressee is unknown, remove the sink from the list */ + if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) { + PMIX_RELEASE(proct->stdinev); + } + } } } } @@ -409,8 +404,9 @@ static void stdin_write_handler(int fd, short event, void *cbdata) PMIX_ACQUIRE_OBJECT(sink); PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, - "%s hnp:stdin:write:handler writing data to %d", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), wev->fd)); + "%s hnp:stdin:write:handler writing %d data to %d", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), + (int)pmix_list_get_size(&wev->outputs), wev->fd)); wev->pending = false; diff --git a/src/prted/pmix/pmix_server_gen.c b/src/prted/pmix/pmix_server_gen.c index 7dc867b535..8f8a84ef3c 100644 --- a/src/prted/pmix/pmix_server_gen.c +++ b/src/prted/pmix/pmix_server_gen.c @@ -761,8 +761,10 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p prte_grpcomm_signature_t *sig; pmix_proc_t *proct; - prte_output_verbose(2, prte_pmix_server_globals.output, "%s job control request from %s:%d", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), requestor->nspace, requestor->rank); + prte_output_verbose(2, prte_pmix_server_globals.output, + "%s job control request from %s:%d", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), + requestor->nspace, requestor->rank); for (m = 0; m < ndirs; m++) { if (0 == strncmp(directives[m].key, PMIX_JOB_CTRL_KILL, PMIX_MAX_KEYLEN)) { @@ -1090,8 +1092,10 @@ static void pmix_server_stdin_push(int sd, short args, void *cbdata) size_t n; for (n = 0; n < cd->nprocs; n++) { - PRTE_OUTPUT_VERBOSE((1, prte_debug_output, "%s pmix_server_stdin_push to dest %s: size %zu", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), PRTE_NAME_PRINT(&cd->procs[n]), + PRTE_OUTPUT_VERBOSE((1, prte_pmix_server_globals.output, + "%s pmix_server_stdin_push to dest %s: size %zu", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), + PRTE_NAME_PRINT(&cd->procs[n]), bo->size)); prte_iof.push_stdin(&cd->procs[n], (uint8_t *) bo->bytes, bo->size); } diff --git a/src/tools/prte/prte.c b/src/tools/prte/prte.c index ac248a2434..bca76365ff 100644 --- a/src/tools/prte/prte.c +++ b/src/tools/prte/prte.c @@ -1129,19 +1129,33 @@ int main(int argc, char *argv[]) prte_output(0, "JOB %s EXECUTING", PRTE_JOBID_PRINT(spawnednspace)); } - /* push our stdin to the apps */ - PMIX_LOAD_PROCID(&pname, spawnednspace, 0); // forward stdin to rank=0 - PMIX_INFO_CREATE(iptr, 1); - PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL); - PRTE_PMIX_CONSTRUCT_LOCK(&lock); - ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock); - if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) { - prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret)); - } else if (PMIX_SUCCESS == ret) { - PRTE_PMIX_WAIT_THREAD(&lock); + /* check what user wants us to do with stdin */ + PMIX_LOAD_NSPACE(pname.nspace, spawnednspace); + opt = pmix_cmd_line_get_param(&results, PRTE_CLI_STDIN); + if (NULL != opt) { + if (0 == strcmp(opt->values[0], "all")) { + pname.rank = PMIX_RANK_WILDCARD; + } else if (0 == strcmp(opt->values[0], "none")) { + pname.rank = PMIX_RANK_INVALID; + } else { + pname.rank = 0; + } + } else { + pname.rank = 0; + } + if (PMIX_RANK_INVALID != pname.rank) { + PMIX_INFO_CREATE(iptr, 1); + PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL); + PRTE_PMIX_CONSTRUCT_LOCK(&lock); + ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock); + if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) { + prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret)); + } else if (PMIX_SUCCESS == ret) { + PRTE_PMIX_WAIT_THREAD(&lock); + } + PRTE_PMIX_DESTRUCT_LOCK(&lock); + PMIX_INFO_FREE(iptr, 1); } - PRTE_PMIX_DESTRUCT_LOCK(&lock); - PMIX_INFO_FREE(iptr, 1); proceed: /* loop the event lib until an exit event is detected */ diff --git a/src/tools/prun/prun.c b/src/tools/prun/prun.c index b9e979d942..bfe2b23d5f 100644 --- a/src/tools/prun/prun.c +++ b/src/tools/prun/prun.c @@ -958,19 +958,33 @@ int prun(int argc, char *argv[]) PRTE_PMIX_DESTRUCT_LOCK(&lock); PMIX_INFO_FREE(iptr, 2); - /* push our stdin to the apps */ - PMIX_LOAD_PROCID(&pname, spawnednspace, 0); // forward stdin to rank=0 - PMIX_INFO_CREATE(iptr, 1); - PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL); - PRTE_PMIX_CONSTRUCT_LOCK(&lock); - ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock); - if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) { - prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret)); - } else if (PMIX_SUCCESS == ret) { - PRTE_PMIX_WAIT_THREAD(&lock); + /* check what user wants us to do with stdin */ + PMIX_LOAD_NSPACE(pname.nspace, spawnednspace); + opt = pmix_cmd_line_get_param(&results, PRTE_CLI_STDIN); + if (NULL != opt) { + if (0 == strcmp(opt->values[0], "all")) { + pname.rank = PMIX_RANK_WILDCARD; + } else if (0 == strcmp(opt->values[0], "none")) { + pname.rank = PMIX_RANK_INVALID; + } else { + pname.rank = 0; + } + } else { + pname.rank = 0; + } + if (PMIX_RANK_INVALID != pname.rank) { + PMIX_INFO_CREATE(iptr, 1); + PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL); + PRTE_PMIX_CONSTRUCT_LOCK(&lock); + ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock); + if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) { + prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret)); + } else if (PMIX_SUCCESS == ret) { + PRTE_PMIX_WAIT_THREAD(&lock); + } + PRTE_PMIX_DESTRUCT_LOCK(&lock); + PMIX_INFO_FREE(iptr, 1); } - PRTE_PMIX_DESTRUCT_LOCK(&lock); - PMIX_INFO_FREE(iptr, 1); /* register to be notified when * our job completes */