Skip to content

Commit

Permalink
Merge pull request #1261 from rhc54/topic/stdin
Browse files Browse the repository at this point in the history
Ensure that stdin goes to all specified targets
  • Loading branch information
rhc54 authored Mar 10, 2022
2 parents bea9374 + 8450055 commit f484c2f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 97 deletions.
134 changes: 65 additions & 69 deletions src/mca/iof/hnp/iof_hnp.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz);
* which operates independently and is in the iof_hnp_receive.c file
*/

prte_iof_base_module_t prte_iof_hnp_module = {.init = init,
.push = hnp_push,
.pull = hnp_pull,
.close = hnp_close,
.output = hnp_output,
.complete = hnp_complete,
.finalize = finalize,
.push_stdin = push_stdin};
prte_iof_base_module_t prte_iof_hnp_module = {
.init = init,
.push = hnp_push,
.pull = hnp_pull,
.close = hnp_close,
.output = hnp_output,
.complete = hnp_complete,
.finalize = finalize,
.push_stdin = push_stdin
};

/* Initialize the module */
static int init(void)
Expand Down Expand Up @@ -192,7 +194,7 @@ static int hnp_push(const pmix_proc_t *dst_name, prte_iof_tag_t src_tag, int fd)
*/
static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz)
{
prte_iof_proc_t *proct, *pptr;
prte_iof_proc_t *proct;
int rc;

/* don't do this if the dst vpid is invalid */
Expand All @@ -205,66 +207,59 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz)
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), PRTE_NAME_PRINT(dst_name), sz));

/* do we already have this process in our list? */
proct = NULL;
PMIX_LIST_FOREACH(pptr, &prte_iof_hnp_component.procs, prte_iof_proc_t)
PMIX_LIST_FOREACH(proct, &prte_iof_hnp_component.procs, prte_iof_proc_t)
{
if (PMIX_CHECK_PROCID(&pptr->name, dst_name)) {
/* found it */
proct = pptr;
}
}
if (NULL == proct) {
return PRTE_ERR_NOT_FOUND;
}

/* did they direct that the data go to this proc? */
if (NULL == proct->stdinev) {
/* nope - ignore it */
return PRTE_SUCCESS;
}

/* if the daemon is me, then this is a local sink */
if (PMIX_CHECK_PROCID(PRTE_PROC_MY_NAME, &proct->stdinev->daemon)) {
PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->name)));
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdinev->wev) {
if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name,
PRTE_IOF_STDIN, data, sz,
proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

PRTE_OUTPUT_VERBOSE(
(1, prte_iof_base_framework.framework_output, "buffer backed up - holding"));
return PRTE_ERR_OUT_OF_RESOURCE;
if (PMIX_CHECK_PROCID(&proct->name, dst_name)) {
/* did they direct that the data go to this proc? */
if (NULL == proct->stdinev) {
/* nope - ignore it */
continue;
}
}
} else {
PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s sending %d bytes from stdinev to daemon %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->stdinev->daemon)));

/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
* we pass sink->name to indicate who is to
* receive the data. If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
if (PRTE_SUCCESS
!= (rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon,
&proct->stdinev->name, PRTE_IOF_STDIN, data,
sz))) {
/* if the addressee is unknown, remove the sink from the list */
if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) {
PMIX_RELEASE(proct->stdinev);

/* if the daemon is me, then this is a local sink */
if (PMIX_CHECK_PROCID(PRTE_PROC_MY_NAME, &proct->stdinev->daemon)) {
PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->name)));
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdinev->wev) {
if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name,
PRTE_IOF_STDIN, data, sz,
proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"buffer backed up - holding"));
return PRTE_ERR_OUT_OF_RESOURCE;
}
}
} else {
PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s sending %d bytes from stdinev to daemon %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->stdinev->daemon)));

/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
* we pass sink->name to indicate who is to
* receive the data. If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
if (PRTE_SUCCESS
!= (rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon,
&proct->stdinev->name, PRTE_IOF_STDIN, data,
sz))) {
/* if the addressee is unknown, remove the sink from the list */
if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) {
PMIX_RELEASE(proct->stdinev);
}
}
}
}
}
Expand Down Expand Up @@ -409,8 +404,9 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
PMIX_ACQUIRE_OBJECT(sink);

PRTE_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s hnp:stdin:write:handler writing data to %d",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), wev->fd));
"%s hnp:stdin:write:handler writing %d data to %d",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
(int)pmix_list_get_size(&wev->outputs), wev->fd));

wev->pending = false;

Expand Down
12 changes: 8 additions & 4 deletions src/prted/pmix/pmix_server_gen.c
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,10 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p
prte_grpcomm_signature_t *sig;
pmix_proc_t *proct;

prte_output_verbose(2, prte_pmix_server_globals.output, "%s job control request from %s:%d",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), requestor->nspace, requestor->rank);
prte_output_verbose(2, prte_pmix_server_globals.output,
"%s job control request from %s:%d",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
requestor->nspace, requestor->rank);

for (m = 0; m < ndirs; m++) {
if (0 == strncmp(directives[m].key, PMIX_JOB_CTRL_KILL, PMIX_MAX_KEYLEN)) {
Expand Down Expand Up @@ -1090,8 +1092,10 @@ static void pmix_server_stdin_push(int sd, short args, void *cbdata)
size_t n;

for (n = 0; n < cd->nprocs; n++) {
PRTE_OUTPUT_VERBOSE((1, prte_debug_output, "%s pmix_server_stdin_push to dest %s: size %zu",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), PRTE_NAME_PRINT(&cd->procs[n]),
PRTE_OUTPUT_VERBOSE((1, prte_pmix_server_globals.output,
"%s pmix_server_stdin_push to dest %s: size %zu",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
PRTE_NAME_PRINT(&cd->procs[n]),
bo->size));
prte_iof.push_stdin(&cd->procs[n], (uint8_t *) bo->bytes, bo->size);
}
Expand Down
38 changes: 26 additions & 12 deletions src/tools/prte/prte.c
Original file line number Diff line number Diff line change
Expand Up @@ -1129,19 +1129,33 @@ int main(int argc, char *argv[])
prte_output(0, "JOB %s EXECUTING", PRTE_JOBID_PRINT(spawnednspace));
}

/* push our stdin to the apps */
PMIX_LOAD_PROCID(&pname, spawnednspace, 0); // forward stdin to rank=0
PMIX_INFO_CREATE(iptr, 1);
PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL);
PRTE_PMIX_CONSTRUCT_LOCK(&lock);
ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock);
if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) {
prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret));
} else if (PMIX_SUCCESS == ret) {
PRTE_PMIX_WAIT_THREAD(&lock);
/* check what user wants us to do with stdin */
PMIX_LOAD_NSPACE(pname.nspace, spawnednspace);
opt = pmix_cmd_line_get_param(&results, PRTE_CLI_STDIN);
if (NULL != opt) {
if (0 == strcmp(opt->values[0], "all")) {
pname.rank = PMIX_RANK_WILDCARD;
} else if (0 == strcmp(opt->values[0], "none")) {
pname.rank = PMIX_RANK_INVALID;
} else {
pname.rank = 0;
}
} else {
pname.rank = 0;
}
if (PMIX_RANK_INVALID != pname.rank) {
PMIX_INFO_CREATE(iptr, 1);
PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL);
PRTE_PMIX_CONSTRUCT_LOCK(&lock);
ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock);
if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) {
prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret));
} else if (PMIX_SUCCESS == ret) {
PRTE_PMIX_WAIT_THREAD(&lock);
}
PRTE_PMIX_DESTRUCT_LOCK(&lock);
PMIX_INFO_FREE(iptr, 1);
}
PRTE_PMIX_DESTRUCT_LOCK(&lock);
PMIX_INFO_FREE(iptr, 1);

proceed:
/* loop the event lib until an exit event is detected */
Expand Down
38 changes: 26 additions & 12 deletions src/tools/prun/prun.c
Original file line number Diff line number Diff line change
Expand Up @@ -958,19 +958,33 @@ int prun(int argc, char *argv[])
PRTE_PMIX_DESTRUCT_LOCK(&lock);
PMIX_INFO_FREE(iptr, 2);

/* push our stdin to the apps */
PMIX_LOAD_PROCID(&pname, spawnednspace, 0); // forward stdin to rank=0
PMIX_INFO_CREATE(iptr, 1);
PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL);
PRTE_PMIX_CONSTRUCT_LOCK(&lock);
ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock);
if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) {
prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret));
} else if (PMIX_SUCCESS == ret) {
PRTE_PMIX_WAIT_THREAD(&lock);
/* check what user wants us to do with stdin */
PMIX_LOAD_NSPACE(pname.nspace, spawnednspace);
opt = pmix_cmd_line_get_param(&results, PRTE_CLI_STDIN);
if (NULL != opt) {
if (0 == strcmp(opt->values[0], "all")) {
pname.rank = PMIX_RANK_WILDCARD;
} else if (0 == strcmp(opt->values[0], "none")) {
pname.rank = PMIX_RANK_INVALID;
} else {
pname.rank = 0;
}
} else {
pname.rank = 0;
}
if (PMIX_RANK_INVALID != pname.rank) {
PMIX_INFO_CREATE(iptr, 1);
PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL);
PRTE_PMIX_CONSTRUCT_LOCK(&lock);
ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock);
if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) {
prte_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret));
} else if (PMIX_SUCCESS == ret) {
PRTE_PMIX_WAIT_THREAD(&lock);
}
PRTE_PMIX_DESTRUCT_LOCK(&lock);
PMIX_INFO_FREE(iptr, 1);
}
PRTE_PMIX_DESTRUCT_LOCK(&lock);
PMIX_INFO_FREE(iptr, 1);

/* register to be notified when
* our job completes */
Expand Down

0 comments on commit f484c2f

Please sign in to comment.