From 9db9fdbb1b401d0cebc269d8cb9e3d0dac440070 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 26 Oct 2023 07:22:43 -0600 Subject: [PATCH] Continue cleanup of pseudo-scheduler daemon Let it build by default for simplicity. Cleanup the backend operations to remove things that aren't directly part of the scheduler. Begin implementation of the sched state machine. Signed-off-by: Ralph Castain --- config/prte_configure_options.m4 | 12 +- src/mca/state/base/state_base_fns.c | 5 +- src/prted/pmix/pmix_server.c | 1 + src/tools/psched/backend.c | 537 ++++++---------------------- src/tools/psched/psched.c | 1 + src/tools/psched/psched.h | 137 ++++++- src/tools/psched/scheduler.c | 127 +++++++ src/tools/psched/server.c | 52 ++- src/tools/psched/session.c | 223 ++---------- src/tools/psched/state.c | 224 +++++++++++- 10 files changed, 619 insertions(+), 700 deletions(-) diff --git a/config/prte_configure_options.m4 b/config/prte_configure_options.m4 index 616d677437..ffbef0547b 100644 --- a/config/prte_configure_options.m4 +++ b/config/prte_configure_options.m4 @@ -383,15 +383,15 @@ AC_DEFINE_UNQUOTED([PRTE_ENABLE_GETPWUID], [$prte_want_getpwuid], AC_MSG_CHECKING([if want to install PRRTE pseudo-scheduler]) AC_ARG_WITH(prte-scheduler, AS_HELP_STRING([--with-prte-scheduler], - [Normal PRTE users/applications do not need this. Users/applications wishing to explore dynamic allocation support probably do (default: disabled).])) -if test "$with_prte_scheduler" = "yes"; then - AC_MSG_RESULT([yes]) - prte_want_scheduler="yes" - WANT_PRTE_SCHED=1 -else + [Normal PRTE users/applications do not need this. Users/applications wishing to explore dynamic allocation support probably do (default: enabled).])) +if test "$with_prte_scheduler" = "no"; then AC_MSG_RESULT([no]) prte_want_scheduler="no" WANT_PRTE_SCHED=0 +else + AC_MSG_RESULT([yes]) + prte_want_scheduler="yes" + WANT_PRTE_SCHED=1 fi AM_CONDITIONAL(WANT_PRTE_SCHED, test "$WANT_PRTE_SCHED" = 1) PRTE_SUMMARY_ADD([Miscellaneous], [PRTE Pseudo-Scheduler], [], [$prte_want_scheduler]) diff --git a/src/mca/state/base/state_base_fns.c b/src/mca/state/base/state_base_fns.c index 6a547eaee2..b36a1923c5 100644 --- a/src/mca/state/base/state_base_fns.c +++ b/src/mca/state/base/state_base_fns.c @@ -114,13 +114,10 @@ void prte_state_base_activate_job_state(prte_job_t *jdata, prte_job_state_t stat int prte_state_base_add_job_state(prte_job_state_t state, prte_state_cbfunc_t cbfunc, int priority) { - pmix_list_item_t *item; prte_state_t *st; /* check for uniqueness */ - for (item = pmix_list_get_first(&prte_job_states); item != pmix_list_get_end(&prte_job_states); - item = pmix_list_get_next(item)) { - st = (prte_state_t *) item; + PMIX_LIST_FOREACH(st, &prte_job_states, prte_state_t) { if (st->job_state == state) { PMIX_OUTPUT_VERBOSE((1, prte_state_base_framework.framework_output, "DUPLICATE STATE DEFINED: %s", prte_job_state_to_str(state))); diff --git a/src/prted/pmix/pmix_server.c b/src/prted/pmix/pmix_server.c index e417aa06d9..cb46f601e5 100644 --- a/src/prted/pmix/pmix_server.c +++ b/src/prted/pmix/pmix_server.c @@ -968,6 +968,7 @@ void pmix_server_finalize(void) PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.notifications); PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.psets); PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.groups); + PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.tools); /* shutdown the local server */ prte_pmix_server_globals.initialized = false; diff --git a/src/tools/psched/backend.c b/src/tools/psched/backend.c index 32ef637d05..e4c206fd3f 100644 --- a/src/tools/psched/backend.c +++ b/src/tools/psched/backend.c @@ -33,10 +33,12 @@ #ifdef HAVE_UNISTD_H # include #endif +#include #include "src/hwloc/hwloc-internal.h" #include "src/pmix/pmix-internal.h" #include "src/util/pmix_argv.h" +#include "src/util/pmix_os_dirpath.h" #include "src/util/pmix_output.h" #include "src/mca/errmgr/errmgr.h" @@ -59,6 +61,86 @@ #include "src/prted/pmix/pmix_server_internal.h" #include "src/tools/psched/psched.h" +static void opcbfunc(pmix_status_t status, void *cbdata) +{ + prte_pmix_lock_t *lock = (prte_pmix_lock_t *) cbdata; + + lock->status = prte_pmix_convert_status(status); + PRTE_PMIX_WAKEUP_THREAD(lock); +} + +/* add any info that the tool couldn't self-assign */ +static int register_tool(pmix_nspace_t nspace) +{ + void *ilist; + pmix_status_t ret; + char *tmp; + pmix_data_array_t darray; + pmix_info_t *iptr; + size_t ninfo; + prte_pmix_lock_t lock; + int rc; + prte_pmix_tool_t *tl; + + PMIX_INFO_LIST_START(ilist); + + PMIX_INFO_LIST_ADD(ret, ilist, PMIX_TMPDIR, + prte_process_info.jobfam_session_dir, PMIX_STRING); + + /* create and pass a job-level session directory */ + if (0 > pmix_asprintf(&tmp, "%s/%u", prte_process_info.jobfam_session_dir, + PRTE_LOCAL_JOBID(nspace))) { + PRTE_ERROR_LOG(PRTE_ERR_OUT_OF_RESOURCE); + return PRTE_ERR_OUT_OF_RESOURCE; + } + rc = pmix_os_dirpath_create(tmp, S_IRWXU); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + free(tmp); + rc = prte_pmix_convert_status(rc); + return rc; + } + PMIX_INFO_LIST_ADD(ret, ilist, PMIX_NSDIR, tmp, PMIX_STRING); + + /* record this tool */ + tl = PMIX_NEW(prte_pmix_tool_t); + PMIX_LOAD_PROCID(&tl->name, nspace, 0); + tl->nsdir = tmp; + pmix_list_append(&psched_globals.tools, &tl->super); + + /* pass it down */ + PMIX_INFO_LIST_CONVERT(ret, ilist, &darray); + if (PMIX_ERR_EMPTY == ret) { + iptr = NULL; + ninfo = 0; + } else if (PMIX_SUCCESS != ret) { + PMIX_ERROR_LOG(ret); + rc = prte_pmix_convert_status(ret); + PMIX_INFO_LIST_RELEASE(ilist); + return rc; + } else { + iptr = (pmix_info_t *) darray.array; + ninfo = darray.size; + } + PMIX_INFO_LIST_RELEASE(ilist); + + PRTE_PMIX_CONSTRUCT_LOCK(&lock); + ret = PMIx_server_register_nspace(nspace, 1, iptr, ninfo, + opcbfunc, &lock); + if (PMIX_SUCCESS != ret) { + PMIX_ERROR_LOG(ret); + rc = prte_pmix_convert_status(ret); + PMIX_INFO_FREE(iptr, ninfo); + PRTE_PMIX_DESTRUCT_LOCK(&lock); + return rc; + } + PRTE_PMIX_WAIT_THREAD(&lock); + rc = lock.status; + PRTE_PMIX_DESTRUCT_LOCK(&lock); + PMIX_INFO_FREE(iptr, ninfo); + return rc; +} + static void _toolconn(int sd, short args, void *cbdata) { pmix_server_req_t *cd = (pmix_server_req_t *) cbdata; @@ -111,7 +193,7 @@ static void _toolconn(int sd, short args, void *cbdata) cd->cmdline = strdup(cd->info[n].value.data.string); } else if (PMIX_CHECK_KEY(&cd->info[n], PMIX_LAUNCHER)) { cd->launcher = PMIX_INFO_TRUE(&cd->info[n]); - } else if (PMIX_CHECK_KEY(&cd->info[n], PMIX_SERVER_SCHEDULER)) { + } else if (PMIX_CHECK_KEY(&cd->info[n], PMIX_SERVER_SYS_CONTROLLER)) { cd->scheduler = PMIX_INFO_TRUE(&cd->info[n]); } else if (PMIX_CHECK_KEY(&cd->info[n], PMIX_PROC_PID)) { PMIX_VALUE_GET_NUMBER(xrc, &cd->info[n].value, cd->pid, pid_t); @@ -129,74 +211,33 @@ static void _toolconn(int sd, short args, void *cbdata) pmix_output_verbose(2, prte_pmix_server_globals.output, "%s %s CONNECTION FROM UID %d GID %d NSPACE %s", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), - cd->launcher ? "LAUNCHER" : (cd->scheduler ? "SCHEDULER" : "TOOL"), + cd->launcher ? "LAUNCHER" : (cd->scheduler ? "SYSTEM CONTROLLER" : "TOOL"), cd->uid, cd->gid, cd->target.nspace); - /* if this is the scheduler and we are not the DVM master, then - * this is not allowed */ if (cd->scheduler) { - if (!PRTE_PROC_IS_MASTER) { - cd->toolcbfunc(PMIX_ERR_NOT_SUPPORTED, NULL, cd->cbdata); - PMIX_RELEASE(cd); - return; - } else { - /* mark that the scheduler has attached to us */ - prte_pmix_server_globals.scheduler_connected = true; - PMIX_LOAD_PROCID(&prte_pmix_server_globals.scheduler, - cd->target.nspace, cd->target.rank); - /* we cannot immediately set the scheduler to be our - * PMIx server as the PMIx library hasn't finished - * recording it */ - } + /* mark that the system controller has attached to us */ + psched_globals.controller_connected = true; + PMIX_LOAD_PROCID(&psched_globals.syscontroller, + cd->target.nspace, cd->target.rank); + /* we cannot immediately set the system controller to be our + * PMIx server as the PMIx library hasn't finished + * recording it */ } - /* if we are not the HNP or master, and the tool doesn't - * already have a self-assigned name, then - * we need to ask the master for one */ + /* if the tool doesn't already have a self-assigned name, then + * there isn't much we can do about it */ + xrc = PMIX_SUCCESS; if (PMIX_NSPACE_INVALID(cd->target.nspace) || PMIX_RANK_INVALID == cd->target.rank) { - /* if we are the HNP, we can directly assign the jobid */ - if (PRTE_PROC_IS_MASTER) { - /* the new nspace is our base nspace with an "@N" extension */ - pmix_asprintf(&tmp, "%s@%u", prte_plm_globals.base_nspace, prte_plm_globals.next_jobid); - PMIX_LOAD_PROCID(&cd->target, tmp, 0); - free(tmp); - prte_plm_globals.next_jobid++; - } else { - cd->local_index = pmix_pointer_array_add(&prte_pmix_server_globals.local_reqs, cd); - /* we need to send this to the HNP for a jobid */ - PMIX_DATA_BUFFER_CREATE(buf); - rc = PMIx_Data_pack(NULL, buf, &command, 1, PMIX_UINT8); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - rc = PMIx_Data_pack(NULL, buf, &cd->local_index, 1, PMIX_INT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - /* send it to the HNP for processing - might be myself! */ - PRTE_RML_SEND(rc, PRTE_PROC_MY_HNP->rank, - buf, PRTE_RML_TAG_PLM); - if (PRTE_SUCCESS != rc) { - PRTE_ERROR_LOG(rc); - xrc = prte_pmix_convert_rc(rc); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, cd->local_index, NULL); - PMIX_DATA_BUFFER_RELEASE(buf); - if (NULL != cd->toolcbfunc) { - cd->toolcbfunc(xrc, NULL, cd->cbdata); - } - PMIX_RELEASE(cd); - } - return; + xrc = PMIX_ERR_BAD_PARAM; + } else { + /* the tool is not a client of ours, but we can provide at least some information */ + rc = register_tool(cd->target.nspace); + if (PRTE_SUCCESS != rc) { + xrc = prte_pmix_convert_rc(rc); } } - - /* the tool is not a client of ours, but we can provide at least some information */ - rc = prte_pmix_server_register_tool(cd->target.nspace); - if (PMIX_SUCCESS != rc) { - rc = prte_pmix_convert_rc(rc); - } if (NULL != cd->toolcbfunc) { - cd->toolcbfunc(rc, &cd->target, cd->cbdata); + cd->toolcbfunc(xrc, &cd->target, cd->cbdata); } PMIX_RELEASE(cd); } @@ -225,110 +266,6 @@ void psched_tool_connected_fn(pmix_info_t *info, size_t ninfo, prte_event_active(&(cd->ev), PRTE_EV_WRITE, 1); } -static void lgcbfn(int sd, short args, void *cbdata) -{ - prte_pmix_server_op_caddy_t *cd = (prte_pmix_server_op_caddy_t *) cbdata; - PRTE_HIDE_UNUSED_PARAMS(sd, args); - - if (NULL != cd->cbfunc) { - cd->cbfunc(cd->status, cd->cbdata); - } - PMIX_RELEASE(cd); -} - -void psched_log_fn(const pmix_proc_t *client, - const pmix_info_t data[], size_t ndata, - const pmix_info_t directives[], size_t ndirs, - pmix_op_cbfunc_t cbfunc, void *cbdata) -{ - size_t n, cnt, dcnt; - pmix_data_buffer_t *buf; - int rc = PRTE_SUCCESS; - pmix_data_buffer_t pbuf, dbuf; - pmix_byte_object_t pbo, dbo; - pmix_status_t ret; - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s logging info", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); - /* if we are the one that passed it down, then we don't pass it back */ - dcnt = 0; - for (n = 0; n < ndirs; n++) { - if (PMIX_CHECK_KEY(&directives[n], "prte.log.noloop")) { - if (PMIX_INFO_TRUE(&directives[n])) { - rc = PMIX_SUCCESS; - goto done; - } - } - else { - ret = PMIx_Data_pack(NULL, &dbuf, (pmix_info_t *) &directives[n], 1, PMIX_INFO); - if (PMIX_SUCCESS != ret) { - PMIX_ERROR_LOG(ret); - } - dcnt++; - } - } - - PMIX_DATA_BUFFER_CONSTRUCT(&pbuf); - cnt = 0; - - for (n = 0; n < ndata; n++) { - /* ship this to our HNP/MASTER for processing, even if that is us */ - ret = PMIx_Data_pack(NULL, &pbuf, (pmix_info_t *) &data[n], 1, PMIX_INFO); - if (PMIX_SUCCESS != ret) { - PMIX_ERROR_LOG(ret); - } - ++cnt; - } - if (0 < cnt) { - PMIX_DATA_BUFFER_CREATE(buf); - /* pack the source of this log request */ - rc = PMIx_Data_pack(NULL, buf, (void*)client, 1, PMIX_PROC); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - /* pack number of info provided */ - rc = PMIx_Data_pack(NULL, buf, &cnt, 1, PMIX_SIZE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - /* pack number of directives given */ - rc = PMIx_Data_pack(NULL, buf, &dcnt, 1, PMIX_SIZE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - /* bring over the packed info blob */ - rc = PMIx_Data_unload(&pbuf, &pbo); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - rc = PMIx_Data_pack(NULL, buf, &pbo, 1, PMIX_BYTE_OBJECT); - PMIX_BYTE_OBJECT_DESTRUCT(&pbo); - /* pack the directives blob */ - rc = PMIx_Data_unload(&dbuf, &dbo); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - rc = PMIx_Data_pack(NULL, buf, &dbo, 1, PMIX_BYTE_OBJECT); - PMIX_BYTE_OBJECT_DESTRUCT(&dbo); - /* send the result to the HNP */ - PRTE_RML_SEND(rc, PRTE_PROC_MY_HNP->rank, buf, - PRTE_RML_TAG_LOGGING); - if (PRTE_SUCCESS != rc) { - PRTE_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - } - } - -done: - /* we cannot directly execute the callback here - * as it would threadlock - so shift to somewhere - * safe */ - PRTE_SERVER_PMIX_THREADSHIFT(PRTE_NAME_WILDCARD, NULL, rc, NULL, NULL, 0, lgcbfn, cbfunc, cbdata); -} - pmix_status_t psched_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_proc_t targets[], size_t ntargets, const pmix_info_t directives[], size_t ndirs, @@ -460,275 +397,3 @@ pmix_status_t psched_job_ctrl_fn(const pmix_proc_t *requestor, return PMIX_OPERATION_SUCCEEDED; } - -static void relcb(void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; - - if (NULL != cd->info) { - PMIX_INFO_FREE(cd->info, cd->ninfo); - } - PMIX_RELEASE(cd); -} -static void group_release(int status, pmix_data_buffer_t *buf, void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; - int32_t cnt; - int rc = PRTE_SUCCESS; - pmix_status_t ret; - bool assignedID = false; - bool procsadded = false; - size_t cid; - pmix_proc_t *procs, *members; - size_t n, num_members; - pmix_data_array_t darray; - pmix_info_t info; - pmix_data_buffer_t dbuf; - pmix_byte_object_t bo; - int32_t byused; - pmix_server_pset_t *pset; - - PMIX_ACQUIRE_OBJECT(cd); - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request complete", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - if (PRTE_SUCCESS != status) { - rc = status; - goto complete; - } - - /* if this was a destruct operation, then there is nothing - * further we need do */ - if (PMIX_GROUP_DESTRUCT == cd->op) { - /* find this group ID on our list of groups */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, cd->grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - rc = status; - goto complete; - } - - /* check for any directives */ - cnt = 1; - rc = PMIx_Data_unpack(NULL, buf, &bo, &cnt, PMIX_BYTE_OBJECT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto complete; - } - PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); - PMIX_DATA_BUFFER_LOAD(&dbuf, bo.bytes, bo.size); - - cd->ninfo = 2; - cnt = 1; - rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); - while (PMIX_SUCCESS == rc) { - if (PMIX_CHECK_KEY(&info, PMIX_GROUP_CONTEXT_ID)) { - PMIX_VALUE_GET_NUMBER(rc, &info.value, cid, size_t); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - cd->ninfo = 0; - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - goto complete; - } - assignedID = true; - cd->ninfo++; - } else if (PMIX_CHECK_KEY(&info, PMIX_GROUP_ADD_MEMBERS)) { - members = (pmix_proc_t*)info.value.data.darray->array; - num_members = info.value.data.darray->size; - PMIX_PROC_CREATE(procs, cd->nprocs + num_members); - for (n=0; n < cd->nprocs; n++) { - PMIX_XFER_PROCID(&procs[n], &cd->procs[n]); - } - for (n=0; n < num_members; n++) { - PMIX_XFER_PROCID(&procs[n+cd->nprocs], &members[n]); - } - PMIX_PROC_FREE(cd->procs, cd->nprocs); - cd->procs = procs; - cd->nprocs += num_members; - procsadded = true; - } - /* cleanup */ - PMIX_INFO_DESTRUCT(&info); - /* get the next object */ - cnt = 1; - rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); - } - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - /* the unpacking loop will have ended when the unpack either - * went past the end of the buffer */ - if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - PMIX_ERROR_LOG(rc); - goto complete; - } - rc = PMIX_SUCCESS; - - if (PMIX_GROUP_CONSTRUCT == cd->op) { - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(cd->grpid); - pset->num_members = cd->nprocs; - PMIX_PROC_CREATE(pset->members, pset->num_members); - memcpy(pset->members, cd->procs, cd->nprocs * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - } - - /* if anything is left in the buffer, then it is - * modex data that needs to be stored */ - PMIX_BYTE_OBJECT_CONSTRUCT(&bo); - byused = buf->bytes_used - (buf->unpack_ptr - buf->base_ptr); - if (0 < byused) { - bo.bytes = buf->unpack_ptr; - bo.size = byused; - } - if (NULL != bo.bytes && 0 < bo.size) { - cd->ninfo++; - } - - PMIX_INFO_CREATE(cd->info, cd->ninfo); - n = 0; - // pass back the final group membership - darray.type = PMIX_PROC; - darray.array = cd->procs; - darray.size = cd->nprocs; - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); - PMIX_PROC_FREE(cd->procs, cd->nprocs); - ++n; - if (assignedID) { - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_CONTEXT_ID, &cid, PMIX_SIZE); - ++n; - } - if (NULL != bo.bytes && 0 < bo.size) { - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT); - } - -complete: - ret = prte_pmix_convert_rc(rc); - /* return to the local procs in the collective */ - if (NULL != cd->infocbfunc) { - cd->infocbfunc(ret, cd->info, cd->ninfo, cd->cbdata, relcb, cd); - } else { - if (NULL != cd->info) { - PMIX_INFO_FREE(cd->info, cd->ninfo); - } - PMIX_RELEASE(cd); - } -} - -pmix_status_t psched_group_fn(pmix_group_operation_t op, char *grpid, - const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t directives[], size_t ndirs, - pmix_info_cbfunc_t cbfunc, void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd; - int rc; - size_t i; - bool assignID = false; - pmix_server_pset_t *pset; - bool fence = false; - bool force_local = false; - pmix_byte_object_t *bo = NULL; - struct timeval tv = {0, 0}; - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request recvd", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - /* they are required to pass us an id */ - if (NULL == grpid) { - return PMIX_ERR_BAD_PARAM; - } - - /* check the directives */ - for (i = 0; i < ndirs; i++) { - /* see if they want a context id assigned */ - if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ASSIGN_CONTEXT_ID)) { - assignID = PMIX_INFO_TRUE(&directives[i]); - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_EMBED_BARRIER)) { - fence = PMIX_INFO_TRUE(&directives[i]); - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ENDPT_DATA)) { - bo = (pmix_byte_object_t *) &directives[i].value.data.bo; - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_TIMEOUT)) { - tv.tv_sec = directives[i].value.data.uint32; - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) { - force_local = PMIX_INFO_TRUE(&directives[i]); - } - } - - /* if they don't want us to do a fence and they don't want a - * context id assigned, or they insist on forcing local - * completion of the operation, then we are done */ - if ((!fence && !assignID) || force_local) { - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request - purely local", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - if (PMIX_GROUP_CONSTRUCT == op) { - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(grpid); - pset->num_members = nprocs; - PMIX_PROC_CREATE(pset->members, pset->num_members); - memcpy(pset->members, procs, nprocs * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - } else if (PMIX_GROUP_DESTRUCT == op) { - /* find this group ID on our list of groups */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - } - return PMIX_OPERATION_SUCCEEDED; - } - - cd = PMIX_NEW(prte_pmix_mdx_caddy_t); - cd->grpid = grpid; - cd->op = op; - /* have to copy the procs in case we add members */ - PMIX_PROC_CREATE(cd->procs, nprocs); - memcpy(cd->procs, procs, nprocs * sizeof(pmix_proc_t)); - cd->nprocs = nprocs; - cd->grpcbfunc = group_release; - cd->infocbfunc = cbfunc; - cd->cbdata = cbdata; - - /* compute the signature of this collective */ - if (NULL != procs) { - cd->sig = PMIX_NEW(prte_grpcomm_signature_t); - cd->sig->sz = nprocs; - cd->sig->signature = (pmix_proc_t *) malloc(cd->sig->sz * sizeof(pmix_proc_t)); - memcpy(cd->sig->signature, procs, cd->sig->sz * sizeof(pmix_proc_t)); - } - /* setup the ctrls blob - this will include any "add_members" directive */ - rc = prte_pack_ctrl_options(&cd->ctrls, directives, ndirs); - if (PMIX_SUCCESS != rc) { - PMIX_RELEASE(cd); - return rc; - } - PMIX_DATA_BUFFER_CREATE(cd->buf); - /* if they provided us with a data blob, send it along */ - if (NULL != bo) { - /* We don't own the byte_object and so we have to - * copy it here */ - rc = PMIx_Data_embed(cd->buf, bo); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - } - /* pass it to the global collective algorithm */ - if (PRTE_SUCCESS != (rc = prte_grpcomm.allgather(cd))) { - PRTE_ERROR_LOG(rc); - PMIX_RELEASE(cd); - return PMIX_ERROR; - } - return PMIX_SUCCESS; -} diff --git a/src/tools/psched/psched.c b/src/tools/psched/psched.c index 9290b01d12..6573aaf8ca 100644 --- a/src/tools/psched/psched.c +++ b/src/tools/psched/psched.c @@ -312,6 +312,7 @@ int main(int argc, char *argv[]) /* pre-load any default mca param files */ prte_preload_default_mca_params(); + psched_register_params(); /* Register all MCA Params */ if (PRTE_SUCCESS != (ret = prte_register_params())) { diff --git a/src/tools/psched/psched.h b/src/tools/psched/psched.h index ac5215ac50..5c9cb69b29 100644 --- a/src/tools/psched/psched.h +++ b/src/tools/psched/psched.h @@ -30,6 +30,7 @@ #include "src/class/pmix_list.h" #include "src/class/pmix_pointer_array.h" #include "src/mca/schizo/schizo.h" +#include "src/mca/state/state.h" BEGIN_C_DECLS @@ -46,13 +47,14 @@ typedef struct { extern psched_globals_t psched_globals; -void psched_schizo_init(void); -void psched_state_init(void); -void psched_errmgr_init(void); -int psched_server_init(pmix_cli_result_t *results); -void psched_server_finalize(void); -void psched_scheduler_init(void); -void psched_scheduler_finalize(void); +extern void psched_schizo_init(void); +extern void psched_state_init(void); +extern void psched_errmgr_init(void); +extern int psched_server_init(pmix_cli_result_t *results); +extern void psched_server_finalize(void); +extern void psched_scheduler_init(void); +extern void psched_scheduler_finalize(void); +extern void psched_register_params(void); extern pmix_status_t psched_register_events_fn(pmix_status_t *codes, size_t ncodes, const pmix_info_t info[], size_t ninfo, @@ -76,21 +78,11 @@ extern pmix_status_t psched_query_fn(pmix_proc_t *proct, extern void psched_tool_connected_fn(pmix_info_t *info, size_t ninfo, pmix_tool_connection_cbfunc_t cbfunc, void *cbdata); -extern void psched_log_fn(const pmix_proc_t *client, - const pmix_info_t data[], size_t ndata, - const pmix_info_t directives[], size_t ndirs, - pmix_op_cbfunc_t cbfunc, void *cbdata); - extern pmix_status_t psched_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_proc_t targets[], size_t ntargets, const pmix_info_t directives[], size_t ndirs, pmix_info_cbfunc_t cbfunc, void *cbdata); -extern pmix_status_t psched_group_fn(pmix_group_operation_t op, char *gpid, - const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t directives[], size_t ndirs, - pmix_info_cbfunc_t cbfunc, void *cbdata); - extern pmix_status_t psched_alloc_fn(const pmix_proc_t *client, pmix_alloc_directive_t directive, const pmix_info_t data[], size_t ndata, @@ -103,7 +95,118 @@ extern pmix_status_t psched_session_ctrl_fn(const pmix_proc_t *requestor, pmix_info_cbfunc_t cbfunc, void *cbdata); #endif +// global objects extern prte_schizo_base_module_t psched_schizo_module; +extern pmix_list_t prte_psched_states; + +typedef int32_t prte_sched_state_t; +#define PSCHED_STATE_ANY INT32_MAX +#define PSCHED_STATE_UNDEF 0 +#define PSCHED_STATE_INIT 1 +#define PSCHED_STATE_QUEUE 2 +#define PSCHED_STATE_SESSION_COMPLETE 30 + +/* Define a boundary so we can easily and quickly determine + * if a scheduler operation abnormally terminated - leave a little room + * for future expansion + */ +#define PSCHED_STATE_ERROR 50 + +typedef struct { + pmix_list_item_t super; + prte_sched_state_t sched_state; + prte_state_cbfunc_t cbfunc; +} psched_state_t; +PRTE_EXPORT PMIX_CLASS_DECLARATION(psched_state_t); + +PRTE_EXPORT extern pmix_list_t prte_psched_states; + +/* track a session throughout its lifecycle */ +typedef struct { + /** Base object so this can be put on a list */ + pmix_list_item_t super; + prte_event_t ev; + // allocation request info + pmix_proc_t requestor; + pmix_alloc_directive_t directive; + // original info keys + pmix_info_t *data; + size_t ndata; + // callback upon completion + pmix_info_cbfunc_t cbfunc; + void *cbdata; + // processed directives + char *user_refid; + char *alloc_refid; + uint64_t num_nodes; + char *nlist; + char *exclude; + uint64_t num_cpus; + char *ncpulist; + char *cpulist; + float memsize; + char *time; + char *queue; + bool preemptible; + char *lend; + char *image; + bool waitall; + bool share; + bool noshell; + char *dependency; + char *begintime; + // internal tracking info + prte_sched_state_t state; + // assigned session info + uint32_t sessionID; +} psched_req_t; +PRTE_EXPORT PMIX_CLASS_DECLARATION(psched_req_t); + +extern const char* prte_sched_state_to_str(prte_sched_state_t s); +// scheduler operations +extern void psched_activate_sched_state(psched_req_t *req, prte_sched_state_t state); +extern void psched_request_init(int fd, short args, void *cbdata); +extern void psched_request_queue(int fd, short args, void *cbdata); +extern void psched_session_complete(int fd, short args, void *cbdata); + + +#define PRTE_ACTIVATE_SCHED_STATE(j, s) \ + do { \ + psched_req_t *shadow = (j); \ + if (psched_globals.verbosity > 0) { \ + double timestamp = 0.0; \ + PRTE_STATE_GET_TIMESTAMP(timestamp); \ + pmix_output_verbose(1, psched_globals.output, \ + "%s [%f] ACTIVATE SCHED %s STATE %s AT %s:%d", \ + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), timestamp, \ + (NULL == shadow->alloc_refid) ? "NO REFID" : shadow->alloc_refid, \ + prte_sched_state_to_str((s)), __FILE__, __LINE__); \ + } \ + psched_activate_sched_state(shadow, (s)); \ + } while (0); + +#define PRTE_REACHING_SCHED_STATE(j, s) \ + do { \ + psched_req_t *shadow = (j); \ + if (psched_globals.verbosity > 0) { \ + double timestamp = 0.0; \ + PRTE_STATE_GET_TIMESTAMP(timestamp); \ + pmix_output_verbose(1, psched_globals.output, \ + "%s [%f] ACTIVATING SCHED %s STATE %s AT %s:%d", \ + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), timestamp, \ + (NULL == shadow->alloc_refid) ? "NO REFID" : shadow->alloc_refid, \ + prte_sched_state_to_str((s)), __FILE__, __LINE__); \ + shadow->state = (s); \ + } \ + } while (0); + +#define PSCHED_THREADSHIFT(c, fn) \ + do { \ + prte_event_set(prte_event_base, &((c)->ev), -1, PRTE_EV_WRITE, (fn), (c)); \ + prte_event_set_priority(&((c)->ev), PRTE_MSG_PRI); \ + PMIX_POST_OBJECT(c); \ + prte_event_active(&((c)->ev), PRTE_EV_WRITE, 1); \ + } while (0); END_C_DECLS diff --git a/src/tools/psched/scheduler.c b/src/tools/psched/scheduler.c index 074363fa55..5572c30f10 100644 --- a/src/tools/psched/scheduler.c +++ b/src/tools/psched/scheduler.c @@ -52,3 +52,130 @@ void psched_scheduler_finalize(void) { return; } + +void psched_request_init(int fd, short args, void *cbdata) +{ + psched_req_t *req = (psched_req_t*)cbdata; + size_t n; + pmix_status_t rc, rcerr = PMIX_SUCCESS; + bool notwaiting = false; + + pmix_output_verbose(2, psched_globals.output, + "%s scheduler:psched: init request", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); + + // process the incoming directives + for (n=0; n < req->ndata; n++) { + if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_REQ_ID)) { + req->user_refid = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_ID)) { + req->alloc_refid = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_NUM_NODES)) { + PMIX_VALUE_GET_NUMBER(rc, &req->data[n].value, req->num_nodes, uint64_t); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + // track the first error + if (PMIX_SUCCESS == rcerr) { + rcerr = rc; + } + // continue processing as we may need some of the info + // when reporting back the error + } + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_NODE_LIST)) { + req->nlist = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_EXCLUDE)) { + req->exclude = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_NUM_CPUS)) { + PMIX_VALUE_GET_NUMBER(rc, &req->data[n].value, req->num_cpus, uint64_t); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + // track the first error + if (PMIX_SUCCESS == rcerr) { + rcerr = rc; + } + // continue processing as we may need some of the info + // when reporting back the error + } + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_NUM_CPU_LIST)) { + req->ncpulist = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_CPU_LIST)) { + req->cpulist = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_MEM_SIZE)) { + PMIX_VALUE_GET_NUMBER(rc, &req->data[n].value, req->memsize, float); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + // track the first error + if (PMIX_SUCCESS == rcerr) { + rcerr = rc; + } + // continue processing as we may need some of the info + // when reporting back the error + } + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_TIME)) { + req->time = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_QUEUE)) { + req->queue = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_PREEMPTIBLE)) { + req->preemptible = PMIx_Value_true(&req->data[n].value); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_LEND)) { + req->lend = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_IMAGE)) { + req->image = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_WAIT_ALL_NODES)) { + req->waitall = PMIx_Value_true(&req->data[n].value); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_SHARE)) { + req->share = PMIx_Value_true(&req->data[n].value); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_NOSHELL)) { + req->noshell = PMIx_Value_true(&req->data[n].value); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_DEPENDENCY)) { + req->dependency = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_BEGIN)) { + req->begintime = strdup(req->data[n].value.data.string); + } else if (PMIX_CHECK_KEY(&req->data[n], PMIX_ALLOC_NOT_WAITING)) { + notwaiting = true; + } + } + if (notwaiting) { + // we callback with the current status so the requestor + // can be told if we are accepting the request + if (NULL != req->cbfunc) { + req->cbfunc(rcerr, NULL, 0, req->cbdata, NULL, NULL); + } + if (PMIX_SUCCESS == rcerr) { + // continue to next state + PRTE_ACTIVATE_SCHED_STATE(req, PSCHED_STATE_QUEUE); + } else { + PMIX_RELEASE(req); + } + } else if (PMIX_SUCCESS == rcerr) { + // move to next state + PRTE_ACTIVATE_SCHED_STATE(req, PSCHED_STATE_QUEUE); + } else { + // need to reply to requestor so they don't hang + if (NULL != req->cbfunc) { + req->cbfunc(rcerr, NULL, 0, req->cbdata, NULL, NULL); + } + // cannot continue processing the request + PMIX_RELEASE(req); + } + return; +} + + +void psched_request_queue(int fd, short args, void *cbdata) +{ + psched_req_t *req = (psched_req_t*)cbdata; + + pmix_output_verbose(2, psched_globals.output, + "%s scheduler:psched: queue request", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); +} + +void psched_session_complete(int fd, short args, void *cbdata) +{ + psched_req_t *req = (psched_req_t*)cbdata; + + pmix_output_verbose(2, psched_globals.output, + "%s scheduler:psched: session complete", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); +} diff --git a/src/tools/psched/server.c b/src/tools/psched/server.c index 749a88f01a..71bfb718bd 100644 --- a/src/tools/psched/server.c +++ b/src/tools/psched/server.c @@ -88,9 +88,7 @@ static pmix_server_module_t psched_server = { .notify_event = psched_notify_event, .query = psched_query_fn, .tool_connected = psched_tool_connected_fn, - .log = psched_log_fn, .job_control = psched_job_ctrl_fn, - .group = psched_group_fn, .allocate = psched_alloc_fn, #if PMIX_NUMERIC_VERSION >= 0x00050000 .session_control = psched_session_ctrl_fn @@ -210,22 +208,42 @@ static prte_regattr_input_t prte_attributes[] = { {.function = ""}, }; +static int gen_verbose = -1; + void psched_register_params(void) { + bool opened = false; + /* register a verbosity */ + psched_globals.verbosity = -1; (void) pmix_mca_base_var_register("prte", "psched", NULL, "verbose", "Debug verbosity for PRRTE Scheduler", PMIX_MCA_BASE_VAR_TYPE_INT, &psched_globals.verbosity); - if (0 <= psched_globals.verbosity) { + if (0 < psched_globals.verbosity) { psched_globals.output = pmix_output_open(NULL); pmix_output_set_verbosity(psched_globals.output, psched_globals.verbosity); prte_pmix_server_globals.output = pmix_output_open(NULL); pmix_output_set_verbosity(prte_pmix_server_globals.output, psched_globals.verbosity); + opened = true; } + /* register the general verbosity */ + (void) pmix_mca_base_var_register("prte", "pmix", NULL, "server_verbose", + "Debug verbosity for PMIx server", + PMIX_MCA_BASE_VAR_TYPE_INT, + &gen_verbose); + if (0 < gen_verbose && psched_globals.verbosity < gen_verbose) { + if (!opened) { + psched_globals.output = pmix_output_open(NULL); + prte_pmix_server_globals.output = pmix_output_open(NULL); + } + pmix_output_set_verbosity(psched_globals.output, gen_verbose); + pmix_output_set_verbosity(prte_pmix_server_globals.output, gen_verbose); + psched_globals.verbosity = gen_verbose; + } } /* provide a callback function for lost connections to allow us @@ -300,8 +318,6 @@ int psched_server_init(pmix_cli_result_t *results) PMIX_CONSTRUCT(&psched_globals.tools, pmix_list_t); psched_globals.syscontroller = *PRTE_NAME_INVALID; - psched_register_params(); - pmix_output_verbose(2, prte_pmix_server_globals.output, "%s server:psched: initialize", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); @@ -535,30 +551,4 @@ void psched_server_finalize(void) if (PMIX_SUCCESS != prc) { PMIX_ERROR_LOG(prc); } - -#if 0 - /* cleanup collectives */ - pmix_server_req_t *cd; - for (int i = 0; i < prte_pmix_server_globals.local_reqs.size; i++) { - cd = (pmix_server_req_t*)pmix_pointer_array_get_item(&prte_pmix_server_globals.local_reqs, i); - if (NULL != cd) { - PMIX_RELEASE(cd); - } - } - for (int i = 0; i < prte_pmix_server_globals.remote_reqs.size; i++) { - cd = (pmix_server_req_t*)pmix_pointer_array_get_item(&prte_pmix_server_globals.remote_reqs, i); - if (NULL != cd) { - PMIX_RELEASE(cd); - } - } - - PMIX_DESTRUCT(&prte_pmix_server_globals.remote_reqs); - PMIX_DESTRUCT(&prte_pmix_server_globals.local_reqs); - PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.notifications); - PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.psets); - PMIX_LIST_DESTRUCT(&prte_pmix_server_globals.groups); - - /* shutdown the local server */ - psched_globals.initialized = false; -#endif } diff --git a/src/tools/psched/session.c b/src/tools/psched/session.c index f45946139e..f6e9ff8ca9 100644 --- a/src/tools/psched/session.c +++ b/src/tools/psched/session.c @@ -15,205 +15,25 @@ #include "src/tools/psched/psched.h" -static void localrelease(void *cbdata) -{ - pmix_server_req_t *req = (pmix_server_req_t*)cbdata; - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - PMIX_RELEASE(req); -} - -static void infocbfunc(pmix_status_t status, - pmix_info_t *info, size_t ninfo, - void *cbdata, - pmix_release_cbfunc_t rel, void *relcbdata) -{ - pmix_server_req_t *req = (pmix_server_req_t*)cbdata; - - if (NULL != req->infocbfunc) { - req->infocbfunc(status, info, ninfo, req->cbdata, localrelease, req); - if (NULL != rel) { - rel(relcbdata); - } - return; - } - /* need to cleanup ourselves */ - if (NULL != rel) { - rel(relcbdata); - } - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - PMIX_RELEASE(req); -} - -static void pass_request(int sd, short args, void *cbdata) -{ - prte_pmix_server_op_caddy_t *cd = (prte_pmix_server_op_caddy_t*)cbdata; - pmix_server_req_t *req; - pmix_data_buffer_t *buf; - uint8_t command; - pmix_status_t rc; - - /* create a request tracker for this operation */ - req = PMIX_NEW(pmix_server_req_t); - if (0 < cd->allocdir) { - pmix_asprintf(&req->operation, "ALLOCATE: %u", cd->allocdir); - command = 0; - } else { - pmix_asprintf(&req->operation, "SESSIONCTRL: %u", cd->sessionID); - command = 1; - } - req->infocbfunc = cd->infocbfunc; - req->cbdata = cd->cbdata; - /* add this request to our local request tracker array */ - req->local_index = pmix_pointer_array_add(&prte_pmix_server_globals.local_reqs, req); - - /* if we are the DVM master, then handle this ourselves */ - if (PRTE_PROC_IS_MASTER) { - if (!prte_pmix_server_globals.scheduler_connected) { - /* the scheduler has not attached to us - there is - * nothing we can do */ - rc = PMIX_ERR_NOT_SUPPORTED; - goto callback; - } - - /* if we have not yet set the scheduler as our server, do so */ - if (!prte_pmix_server_globals.scheduler_set_as_server) { - rc = PMIx_tool_set_server(&prte_pmix_server_globals.scheduler, NULL, 0); - if (PMIX_SUCCESS != rc) { - goto callback; - } - prte_pmix_server_globals.scheduler_set_as_server = true; - } - - if (0 == command) { - rc = PMIx_Allocation_request_nb(cd->allocdir, cd->info, cd->ninfo, - infocbfunc, req); - } else { -#if PMIX_NUMERIC_VERSION < 0x00050000 - rc = PMIX_ERR_NOT_SUPPORTED; -#else - rc = PMIx_Session_control(cd->sessionID, cd->info, cd->ninfo, - infocbfunc, req); -#endif - } - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto callback; - } - return; - } - - PMIX_DATA_BUFFER_CREATE(buf); - - /* construct a request message for the command */ - rc = PMIx_Data_pack(NULL, buf, &command, 1, PMIX_UINT8); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - - /* pack the local requestor ID */ - rc = PMIx_Data_pack(NULL, buf, &req->local_index, 1, PMIX_INT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - - /* pack the requestor */ - rc = PMIx_Data_pack(NULL, buf, &cd->proc, 1, PMIX_PROC); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - - if (0 == command) { - /* pack the allocation directive */ - rc = PMIx_Data_pack(NULL, buf, &cd->allocdir, 1, PMIX_ALLOC_DIRECTIVE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - } else { - /* pack the sessionID */ - rc = PMIx_Data_pack(NULL, buf, &cd->sessionID, 1, PMIX_UINT32); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - } - - /* pack the number of info */ - rc = PMIx_Data_pack(NULL, buf, &cd->ninfo, 1, PMIX_SIZE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - if (0 < cd->ninfo) { - /* pack the info */ - rc = PMIx_Data_pack(NULL, buf, cd->info, cd->ninfo, PMIX_INFO); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DATA_BUFFER_RELEASE(buf); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - goto callback; - } - } - - /* send this request to the DVM controller - might be us */ - PRTE_RML_SEND(rc, PRTE_PROC_MY_HNP->rank, buf, PRTE_RML_TAG_SCHED); - if (PRTE_SUCCESS != rc) { - PRTE_ERROR_LOG(rc); - pmix_pointer_array_set_item(&prte_pmix_server_globals.local_reqs, req->local_index, NULL); - PMIX_DATA_BUFFER_RELEASE(buf); - goto callback; - } - PMIX_RELEASE(cd); - return; - -callback: - PMIX_RELEASE(cd); - /* this section gets executed solely upon an error */ - if (NULL != req->infocbfunc) { - req->infocbfunc(rc, req->info, req->ninfo, req->cbdata, localrelease, req); - return; - } - PMIX_RELEASE(req); -} - pmix_status_t psched_alloc_fn(const pmix_proc_t *client, pmix_alloc_directive_t directive, const pmix_info_t data[], size_t ndata, pmix_info_cbfunc_t cbfunc, void *cbdata) { - prte_pmix_server_op_caddy_t *cd; - + psched_req_t *req; - pmix_output_verbose(2, prte_pmix_server_globals.output, + pmix_output_verbose(2, psched_globals.output, "%s allocate upcalled on behalf of proc %s:%u with %" PRIsize_t " infos", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), client->nspace, client->rank, ndata); - cd = PMIX_NEW(prte_pmix_server_op_caddy_t); - PMIX_LOAD_PROCID(&cd->proc, client->nspace, client->rank); - cd->allocdir = directive; - cd->info = (pmix_info_t *) data; - cd->ninfo = ndata; - cd->infocbfunc = cbfunc; - cd->cbdata = cbdata; - prte_event_set(prte_event_base, &cd->ev, -1, PRTE_EV_WRITE, pass_request, cd); - prte_event_set_priority(&cd->ev, PRTE_MSG_PRI); - PMIX_POST_OBJECT(cd); - prte_event_active(&cd->ev, PRTE_EV_WRITE, 1); + req = PMIX_NEW(psched_req_t); + PMIX_LOAD_PROCID(&req->requestor, client->nspace, client->rank); + req->directive = directive; + req->data = (pmix_info_t *) data; + req->ndata = ndata; + req->cbfunc = cbfunc; + req->cbdata = cbdata; + PRTE_ACTIVATE_SCHED_STATE(req, PSCHED_STATE_INIT); return PRTE_SUCCESS; } @@ -224,24 +44,21 @@ pmix_status_t psched_session_ctrl_fn(const pmix_proc_t *requestor, const pmix_info_t directives[], size_t ndirs, pmix_info_cbfunc_t cbfunc, void *cbdata) { - prte_pmix_server_op_caddy_t *cd; + psched_req_t *req; - pmix_output_verbose(2, prte_pmix_server_globals.output, + pmix_output_verbose(2, psched_globals.output, "%s session ctrl upcalled on behalf of proc %s:%u with %" PRIsize_t " directives", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), requestor->nspace, requestor->rank, ndirs); - cd = PMIX_NEW(prte_pmix_server_op_caddy_t); - PMIX_LOAD_PROCID(&cd->proc, requestor->nspace, requestor->rank); - cd->sessionID = sessionID; - cd->info = (pmix_info_t *) directives; - cd->ninfo = ndirs; - cd->infocbfunc = cbfunc; - cd->cbdata = cbdata; - prte_event_set(prte_event_base, &cd->ev, -1, PRTE_EV_WRITE, pass_request, cd); - prte_event_set_priority(&cd->ev, PRTE_MSG_PRI); - PMIX_POST_OBJECT(cd); - prte_event_active(&cd->ev, PRTE_EV_WRITE, 1); + req = PMIX_NEW(psched_req_t); + PMIX_LOAD_PROCID(&req->requestor, requestor->nspace, requestor->rank); + req->sessionID = sessionID; + req->data = (pmix_info_t *) directives; + req->ndata = ndirs; + req->cbfunc = cbfunc; + req->cbdata = cbdata; + PRTE_ACTIVATE_SCHED_STATE(req, PSCHED_STATE_SESSION_COMPLETE); return PRTE_SUCCESS; } diff --git a/src/tools/psched/state.c b/src/tools/psched/state.c index 90ec73e3d6..0eeaf59890 100644 --- a/src/tools/psched/state.c +++ b/src/tools/psched/state.c @@ -31,6 +31,9 @@ #include "src/mca/state/base/base.h" #include "psched.h" +/* global variables */ +pmix_list_t prte_psched_states; + /* * Module functions: Global */ @@ -40,9 +43,9 @@ static int finalize(void); /* local functions */ static void alloc_complete(int fd, short args, void *cbata); -/* defined default state machine sequence - individual - * plm's must add a state for launching daemons - */ +/* define job state machine sequence - only required to + * allow integration to main PRRTE code for detecting + * base allocation */ static prte_job_state_t launch_states[] = { PRTE_JOB_STATE_ALLOCATE, PRTE_JOB_STATE_ALLOCATION_COMPLETE @@ -53,6 +56,21 @@ static prte_state_cbfunc_t launch_callbacks[] = { alloc_complete }; +/* define scheduler state machine sequence for walking + * thru an allocation lifecycle */ +static prte_sched_state_t sched_states[] = { + PSCHED_STATE_INIT, + PSCHED_STATE_QUEUE, + PSCHED_STATE_SESSION_COMPLETE +}; + +static prte_state_cbfunc_t sched_callbacks[] = { + psched_request_init, + psched_request_queue, + psched_session_complete +}; + + static void force_quit(int fd, short args, void *cbdata) { PRTE_HIDE_UNUSED_PARAMS(fd, args); @@ -62,6 +80,10 @@ static void force_quit(int fd, short args, void *cbdata) PMIX_RELEASE(caddy); } +static int add_psched_state(prte_sched_state_t state, + prte_state_cbfunc_t cbfunc); +static void psched_print_state_machine(void); + /************************ * Local variables ************************/ @@ -120,6 +142,7 @@ static int init(void) /* setup the state machines */ PMIX_CONSTRUCT(&prte_job_states, pmix_list_t); PMIX_CONSTRUCT(&prte_proc_states, pmix_list_t); + PMIX_CONSTRUCT(&prte_psched_states, pmix_list_t); /* setup the job state machine */ num_states = sizeof(launch_states) / sizeof(prte_job_state_t); @@ -150,6 +173,18 @@ static int init(void) prte_state_base_print_job_state_machine(); } + /* setup the scheduler state machine */ + num_states = sizeof(sched_states) / sizeof(prte_sched_state_t); + for (i = 0; i < num_states; i++) { + rc = add_psched_state(sched_states[i], sched_callbacks[i]); + if (PRTE_SUCCESS != rc) { + PRTE_ERROR_LOG(rc); + } + } + if (4 < pmix_output_get_verbosity(psched_globals.output)) { + psched_print_state_machine(); + } + return PRTE_SUCCESS; } @@ -158,6 +193,7 @@ static int finalize(void) /* cleanup the state machines */ PMIX_LIST_DESTRUCT(&prte_proc_states); PMIX_LIST_DESTRUCT(&prte_job_states); + PMIX_LIST_DESTRUCT(&prte_psched_states); return PRTE_SUCCESS; } @@ -169,3 +205,185 @@ static void alloc_complete(int fd, short args, void *cbdata) PMIX_RELEASE(caddy); } + +void psched_activate_sched_state(psched_req_t *req, prte_sched_state_t state) +{ + psched_state_t *s, *any = NULL, *error = NULL; + + /* check for uniqueness */ + PMIX_LIST_FOREACH(s, &prte_psched_states, psched_state_t) { + if (s->sched_state == PSCHED_STATE_ANY) { + /* save this place */ + any = s; + } + if (s->sched_state == PSCHED_STATE_ERROR) { + error = s; + } + if (s->sched_state == state) { + PRTE_REACHING_SCHED_STATE(req, state); + if (NULL == s->cbfunc) { + pmix_output_verbose(1, psched_globals.output, + "%s NULL CBFUNC FOR SCHED %s STATE %s", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), + (NULL == req->user_refid) ? "N/A" : req->user_refid, + prte_sched_state_to_str(state)); + return; + } + PSCHED_THREADSHIFT(req, s->cbfunc); + return; + } + } + /* if we get here, then the state wasn't found, so execute + * the default handler if it is defined + */ + if (PSCHED_STATE_ERROR < state && NULL != error) { + s = (psched_state_t *) error; + } else if (NULL != any) { + s = (psched_state_t *) any; + } else { + pmix_output_verbose(1, psched_globals.output, + "ACTIVATE: SCHED STATE %s NOT REGISTERED", + prte_sched_state_to_str(state)); + return; + } + if (NULL == s->cbfunc) { + pmix_output_verbose(1, psched_globals.output, + "ACTIVATE: ANY STATE HANDLER NOT DEFINED"); + return; + } + PRTE_REACHING_SCHED_STATE(req, state); + PSCHED_THREADSHIFT(req, s->cbfunc); +} + +static int add_psched_state(prte_sched_state_t state, + prte_state_cbfunc_t cbfunc) +{ + psched_state_t *st; + + /* check for uniqueness */ + PMIX_LIST_FOREACH(st, &prte_psched_states, psched_state_t) { + if (st->sched_state == state) { + pmix_output_verbose(1, psched_globals.output, + "DUPLICATE STATE DEFINED: %s", prte_sched_state_to_str(state)); + return PRTE_ERR_BAD_PARAM; + } + } + + st = PMIX_NEW(psched_state_t); + st->sched_state = state; + st->cbfunc = cbfunc; + pmix_list_append(&prte_psched_states, &(st->super)); + + return PRTE_SUCCESS; +} + +const char* prte_sched_state_to_str(prte_sched_state_t s) +{ + switch (s) { + case PSCHED_STATE_UNDEF: + return "UNDEFINED"; + case PSCHED_STATE_INIT: + return "INIT"; + case PSCHED_STATE_QUEUE: + return "QUEUE"; + case PSCHED_STATE_SESSION_COMPLETE: + return "SESSION COMPLETE"; + default: + return "UNKNOWN"; + } +} + +static void psched_print_state_machine(void) +{ + psched_state_t *st; + + pmix_output(0, "SCHEDULER STATE MACHINE:"); + PMIX_LIST_FOREACH (st, &prte_psched_states, psched_state_t) { + pmix_output(0, "\tState: %s cbfunc: %s", + prte_sched_state_to_str(st->sched_state), + (NULL == st->cbfunc) ? "NULL" : "DEFINED"); + } +} + + +static void state_con(psched_state_t *p) +{ + p->sched_state = PSCHED_STATE_UNDEF; + p->cbfunc = NULL; +} +PMIX_CLASS_INSTANCE(psched_state_t, + pmix_list_item_t, + state_con, NULL); + +static void req_con(psched_req_t *p) +{ + PMIx_Load_procid(&p->requestor, NULL, PMIX_RANK_INVALID); + p->data = NULL; + p->ndata = 0; + p->user_refid = NULL; + p->alloc_refid = NULL; + p->num_nodes = 0; + p->nlist = NULL; + p->exclude = NULL; + p->num_cpus = 0; + p->ncpulist = NULL; + p->cpulist = NULL; + p->memsize = 0.0; + p->time = NULL; + p->queue = NULL; + p->preemptible = false; + p->lend = NULL; + p->image = NULL; + p->waitall = false; + p->share = false; + p->noshell = false; + p->dependency = NULL; + p->begintime = NULL; + p->state = PSCHED_STATE_UNDEF; + p->sessionID = UINT32_MAX; +} +static void req_des(psched_req_t *p) +{ + if (NULL != p->data) { + PMIx_Info_free(p->data, p->ndata); + } + if (NULL != p->user_refid) { + free(p->user_refid); + } + if (NULL != p->alloc_refid) { + free(p->alloc_refid); + } + if (NULL != p->nlist) { + free(p->nlist); + } + if (NULL != p->exclude) { + free(p->exclude); + } + if (NULL != p->ncpulist) { + free(p->ncpulist); + } + if (NULL != p->cpulist) { + free(p->cpulist); + } + if (NULL != p->time) { + free(p->time); + } + if (NULL != p->queue) { + free(p->queue); + } + if (NULL != p->lend) { + free(p->lend); + } + if (NULL != p->image) { + free(p->image); + } + if (NULL != p->dependency) { + free(p->dependency); + } + if (NULL != p->begintime) { + free(p->begintime); + } +} +PMIX_CLASS_INSTANCE(psched_req_t, + pmix_list_item_t, + req_con, req_des);