diff --git a/include/pmix_common.h b/include/pmix_common.h index 7bc9a8ce89..2be2f629b0 100644 --- a/include/pmix_common.h +++ b/include/pmix_common.h @@ -131,6 +131,10 @@ typedef uint32_t pmix_rank_t; #define PMIX_GRPID "pmix.egid" // (uint32_t) effective group id #define PMIX_DSTPATH "pmix.dstpath" // (char*) path to dstore files #define PMIX_VERSION_INFO "pmix.version" // (char*) PMIx version of contactor +#define PMIX_PROGRAMMING_MODEL "pmix.pgm.model" // (char*) programming model being initialized (e.g., "MPI" or "OpenMP") +#define PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH") +#define PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1") +#define PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads") /* attributes for the USOCK rendezvous socket */ @@ -531,6 +535,7 @@ typedef int pmix_status_t; #define PMIX_ERR_EVENT_REGISTRATION (PMIX_ERR_OP_BASE - 14) #define PMIX_ERR_JOB_TERMINATED (PMIX_ERR_OP_BASE - 15) #define PMIX_ERR_UPDATE_ENDPOINTS (PMIX_ERR_OP_BASE - 16) +#define PMIX_MODEL_DECLARED (PMIX_ERR_OP_BASE - 17) /* define a starting point for system error constants so * we avoid renumbering when making additions */ diff --git a/src/client/pmix_client.c b/src/client/pmix_client.c index b93c5d5262..882427a53a 100644 --- a/src/client/pmix_client.c +++ b/src/client/pmix_client.c @@ -238,6 +238,79 @@ static void evhandler_reg_callbk(pmix_status_t status, *active = status; } +typedef struct { + pmix_info_t *info; + size_t ninfo; +} mydata_t; + +static void release_info(pmix_status_t status, void *cbdata) +{ + mydata_t *cd = (mydata_t*)cbdata; + PMIX_INFO_FREE(cd->info, cd->ninfo); + free(cd); +} + +static void _check_for_notify(pmix_info_t info[], size_t ninfo) +{ + mydata_t *cd; + size_t n, m=0; + pmix_info_t *model=NULL, *library=NULL, *vers=NULL, *tmod=NULL; + + for (n=0; n < ninfo; n++) { + if (0 == strncmp(info[n].key, PMIX_PROGRAMMING_MODEL, PMIX_MAX_KEYLEN)) { + /* we need to generate an event indicating that + * a programming model has been declared */ + model = &info[n]; + ++m; + } else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_NAME, PMIX_MAX_KEYLEN)) { + library = &info[n]; + ++m; + } else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_VERSION, PMIX_MAX_KEYLEN)) { + vers = &info[n]; + ++m; + } else if (0 == strncmp(info[n].key, PMIX_THREADING_MODEL, PMIX_MAX_KEYLEN)) { + tmod = &info[n]; + ++m; + } + } + if (0 < m) { + /* notify anyone listening that a model has been declared */ + cd = (mydata_t*)malloc(sizeof(mydata_t)); + if (NULL == cd) { + /* nothing we can do */ + return; + } + PMIX_INFO_CREATE(cd->info, m+1); + if (NULL == cd->info) { + free(cd); + return; + } + cd->ninfo = m+1; + n = 0; + if (NULL != model) { + PMIX_INFO_XFER(&cd->info[n], model); + ++n; + } + if (NULL != library) { + PMIX_INFO_XFER(&cd->info[n], library); + ++n; + } + if (NULL != vers) { + PMIX_INFO_XFER(&cd->info[n], vers); + ++n; + } + if (NULL != tmod) { + PMIX_INFO_XFER(&cd->info[n], tmod); + ++n; + } + /* mark that it is not to go to any default handlers */ + PMIX_INFO_LOAD(&cd->info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL); + PMIx_Notify_event(PMIX_MODEL_DECLARED, + &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL, + cd->info, cd->ninfo, release_info, (void*)cd); + } +} + PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, pmix_info_t info[], size_t ninfo) { @@ -267,6 +340,12 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, (void)strncpy(proc->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); proc->rank = pmix_globals.myid.rank; } + /* we also need to check the info keys to see if something need + * be done with them - e.g., to notify another library that we + * also have called init */ + if (NULL != info) { + _check_for_notify(info, ninfo); + } ++pmix_globals.init_cntr; pmix_mutex_unlock(&pmix_client_bootstrap_mutex); return PMIX_SUCCESS; @@ -396,6 +475,11 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, } PMIX_INFO_DESTRUCT(&ginfo); + /* check to see if we need to notify anyone */ + if (NULL != info) { + _check_for_notify(info, ninfo); + } + pmix_mutex_unlock(&pmix_client_bootstrap_mutex); return PMIX_SUCCESS; diff --git a/src/event/pmix_event.h b/src/event/pmix_event.h index e9ebd33318..2899faa9a6 100644 --- a/src/event/pmix_event.h +++ b/src/event/pmix_event.h @@ -125,11 +125,14 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, pmix_event_chain_t *_ch; \ _ch = PMIX_NEW(pmix_event_chain_t); \ _ch->status = (e); \ - _ch->ninfo = 1; \ + _ch->ninfo = 2; \ _ch->final_cbfunc = (f); \ _ch->final_cbdata = _ch; \ PMIX_INFO_CREATE(_ch->info, _ch->ninfo); \ PMIX_INFO_LOAD(&_ch->info[0], \ + PMIX_EVENT_HDLR_NAME, \ + NULL, PMIX_STRING); \ + PMIX_INFO_LOAD(&_ch->info[1], \ PMIX_EVENT_RETURN_OBJECT, \ NULL, PMIX_POINTER); \ pmix_invoke_local_event_hdlr(_ch); \ diff --git a/src/event/pmix_event_notification.c b/src/event/pmix_event_notification.c index 83474169fd..38f93bd6f4 100644 --- a/src/event/pmix_event_notification.c +++ b/src/event/pmix_event_notification.c @@ -94,7 +94,7 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, pmix_cb_t *cb; pmix_event_chain_t *chain; size_t n; - + pmix_notify_caddy_t *cd, *rbout; pmix_output_verbose(2, pmix_globals.debug_output, "client: notifying server %s:%d of status %s", @@ -104,36 +104,39 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, if (!pmix_globals.connected) { return PMIX_ERR_UNREACH; } - /* create the msg object */ - msg = PMIX_NEW(pmix_buffer_t); - /* pack the command */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cmd, 1, PMIX_CMD))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* pack the status */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &status, 1, PMIX_STATUS))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* no need to pack the source as it is us */ + if (PMIX_RANGE_PROC_LOCAL != range) { + /* create the msg object */ + msg = PMIX_NEW(pmix_buffer_t); - /* pack the range */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* pack the info */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &ninfo, 1, PMIX_SIZE))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - if (0 < ninfo) { - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, info, ninfo, PMIX_INFO))) { + /* pack the command */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cmd, 1, PMIX_CMD))) { PMIX_ERROR_LOG(rc); goto cleanup; } + /* pack the status */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &status, 1, PMIX_STATUS))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + /* no need to pack the source as it is us */ + + /* pack the range */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + /* pack the info */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &ninfo, 1, PMIX_SIZE))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + if (0 < ninfo) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, info, ninfo, PMIX_INFO))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + } } /* setup for our own local callbacks */ @@ -141,8 +144,9 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, chain->status = status; (void)strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); chain->source.rank = pmix_globals.myid.rank; - /* we always leave space for a callback object */ - chain->ninfo = ninfo + 1; + /* we always leave space for a callback object and + * the evhandler name. */ + chain->ninfo = ninfo + 2; PMIX_INFO_CREATE(chain->info, chain->ninfo); if (0 < ninfo) { @@ -151,29 +155,84 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, PMIX_INFO_XFER(&chain->info[n], &info[n]); } } + /* put the evhandler name tag in the next-to-last element - we + * will fill it in as each handler is called */ + PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING); /* now put the callback object tag in the last element */ - PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); - - /* create a callback object as we need to pass it to the - * recv routine so we know which callback to use when - * the server acks/nacks the register events request*/ - cb = PMIX_NEW(pmix_cb_t); - cb->op_cbfunc = cbfunc; - cb->cbdata = cbdata; - /* send to the server */ - pmix_output_verbose(2, pmix_globals.debug_output, - "client: notifying server %s:%d - sending", - pmix_globals.myid.nspace, pmix_globals.myid.rank); - rc = pmix_ptl.send_recv(&pmix_client_globals.myserver, msg, notify_event_cbfunc, cb); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(cb); - goto cleanup; + PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); + + /* we need to cache this event so we can pass it into + * ourselves should someone later register for it */ + cd = PMIX_NEW(pmix_notify_caddy_t); + cd->status = status; + if (NULL == source) { + (void)strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN); + cd->source.rank = PMIX_RANK_UNDEF; + } else { + (void)strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN); + cd->source.rank = source->rank; + } + cd->range = range; + + /* check for directives */ + if (NULL != info) { + cd->ninfo = chain->ninfo; + PMIX_INFO_CREATE(cd->info, cd->ninfo); + for (n=0; n < chain->ninfo; n++) { + PMIX_INFO_XFER(&cd->info[n], &chain->info[n]); + if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { + cd->nondefault = true; + } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { + /* provides an array of pmix_proc_t identifying the procs + * that are to receive this notification, or a single pmix_proc_t */ + if (PMIX_DATA_ARRAY == cd->info[n].value.type && + NULL != cd->info[n].value.data.darray && + NULL != cd->info[n].value.data.darray->array) { + cd->ntargets = cd->info[n].value.data.darray->size; + PMIX_PROC_CREATE(cd->targets, cd->ntargets); + memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t)); + } else if (PMIX_PROC == cd->info[n].value.type) { + cd->ntargets = 1; + PMIX_PROC_CREATE(cd->targets, cd->ntargets); + memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); + } else { + /* this is an error */ + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + return PMIX_ERR_BAD_PARAM; + } + } + } + } + /* add to our cache */ + rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd); + /* if an older event was bumped, release it */ + if (NULL != rbout) { + PMIX_RELEASE(rbout); + } + + if (PMIX_RANGE_PROC_LOCAL != range) { + /* create a callback object as we need to pass it to the + * recv routine so we know which callback to use when + * the server acks/nacks the register events request. The + * server will _not_ send this notification back to us, + * so we handle it locally */ + cb = PMIX_NEW(pmix_cb_t); + cb->op_cbfunc = cbfunc; + cb->cbdata = cbdata; + /* send to the server */ + pmix_output_verbose(2, pmix_globals.debug_output, + "client: notifying server %s:%d - sending", + pmix_globals.myid.nspace, pmix_globals.myid.rank); + rc = pmix_ptl.send_recv(&pmix_client_globals.myserver, msg, notify_event_cbfunc, cb); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(cb); + goto cleanup; + } } /* now notify any matching registered callbacks we have */ pmix_invoke_local_event_hdlr(chain); - PMIX_RELEASE(chain); // maintain accounting return PMIX_SUCCESS; @@ -245,7 +304,7 @@ static void progress_local_event_hdlr(pmix_status_t status, chain->nresults = cnt; /* if the caller indicates that the chain is completed, - * or we completed the "last" event, then stop here */ + * or we completed the "last" event */ if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) { goto complete; } @@ -261,6 +320,13 @@ static void progress_local_event_hdlr(pmix_status_t status, if (nxt->codes[0] == chain->status && check_range(&nxt->rng, &chain->source)) { chain->evhdlr = nxt; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject; nxt->evhdlr(nxt->index, @@ -294,6 +360,13 @@ static void progress_local_event_hdlr(pmix_status_t status, * the source fits within it */ if (nxt->codes[n] == chain->status) { chain->evhdlr = nxt; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject; nxt->evhdlr(nxt->index, @@ -321,6 +394,13 @@ static void progress_local_event_hdlr(pmix_status_t status, * the source fits within it */ if (check_range(&nxt->rng, &chain->source)) { chain->evhdlr = nxt; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject; nxt->evhdlr(nxt->index, @@ -341,6 +421,13 @@ static void progress_local_event_hdlr(pmix_status_t status, if (1 == pmix_globals.events.last->ncodes && pmix_globals.events.last->codes[0] == chain->status) { chain->evhdlr = pmix_globals.events.last; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject; chain->evhdlr->evhdlr(chain->evhdlr->index, @@ -354,6 +441,13 @@ static void progress_local_event_hdlr(pmix_status_t status, for (n=0; n < pmix_globals.events.last->ncodes; n++) { if (pmix_globals.events.last->codes[n] == chain->status) { chain->evhdlr = pmix_globals.events.last; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject; chain->evhdlr->evhdlr(chain->evhdlr->index, @@ -367,6 +461,13 @@ static void progress_local_event_hdlr(pmix_status_t status, } else { /* gets run for all codes */ chain->evhdlr = pmix_globals.events.last; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject; chain->evhdlr->evhdlr(chain->evhdlr->index, @@ -411,8 +512,9 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) bool found; pmix_output_verbose(2, pmix_globals.debug_output, - "%s:%d invoke_local_event_hdlr", - pmix_globals.myid.nspace, pmix_globals.myid.rank); + "%s:%d invoke_local_event_hdlr for status %s", + pmix_globals.myid.nspace, pmix_globals.myid.rank, + PMIx_Error_string(chain->status)); /* sanity check */ if (NULL == chain->info) { @@ -490,19 +592,42 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) } } - /* if they didn't want it to go to a default handler, then we are done */ - if (chain->nondefault) { - goto complete; + /* if they didn't want it to go to a default handler, then ignore them */ + if (!chain->nondefault) { + /* pass it to any default handlers */ + PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) { + if (check_range(&evhdlr->rng, &chain->source)) { + /* invoke the handler */ + chain->evhdlr = evhdlr; + goto invk; + } + } } - /* finally, pass it to any default handlers */ - PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) { - if (check_range(&evhdlr->rng, &chain->source)) { - /* invoke the handler */ - chain->evhdlr = evhdlr; + /* if we registered a "last" handler, and it fits the given range + * and code, then invoke it now */ + if (NULL != pmix_globals.events.last && + check_range(&pmix_globals.events.last->rng, &chain->source)) { + chain->endchain = true; // ensure we don't do this again + if (1 == pmix_globals.events.last->ncodes && + pmix_globals.events.last->codes[0] == chain->status) { + chain->evhdlr = pmix_globals.events.last; + goto invk; + } else if (NULL != pmix_globals.events.last->codes) { + /* need to check if this code is included in the array */ + for (i=0; i < pmix_globals.events.last->ncodes; i++) { + if (pmix_globals.events.last->codes[i] == chain->status) { + chain->evhdlr = pmix_globals.events.last; + goto invk; + } + } + } else { + /* gets run for all codes */ + chain->evhdlr = pmix_globals.events.last; goto invk; } } + /* if we got here, then nothing was found */ complete: /* we still have to call their final callback */ @@ -514,9 +639,18 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) invk: /* invoke the handler */ + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } chain->info[chain->ninfo-1].value.data.ptr = chain->evhdlr->cbobject; pmix_output_verbose(2, pmix_globals.debug_output, - "[%s:%d] INVOKING EVHDLR", __FILE__, __LINE__); + "[%s:%d] INVOKING EVHDLR %s", __FILE__, __LINE__, + (NULL == chain->evhdlr->name) ? + "NULL" : chain->evhdlr->name); chain->evhdlr->evhdlr(chain->evhdlr->index, chain->status, &chain->source, chain->info, chain->ninfo, @@ -544,7 +678,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) * the message until all local procs have received it, or it ages to * the point where it gets pushed out by more recent events */ PMIX_RETAIN(cd); - rbout = pmix_ring_buffer_push(&pmix_server_globals.notifications, cd); + rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd); /* if an older event was bumped, release it */ if (NULL != rbout) { @@ -558,7 +692,8 @@ static void _notify_client_event(int sd, short args, void *cbdata) cd->status == reginfoptr->code) { PMIX_LIST_FOREACH(pr, ®infoptr->peers, pmix_peer_events_info_t) { /* if this client was the source of the event, then - * don't send it back */ + * don't send it back as they will have processed it + * when they generated it */ if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) && cd->source.rank == pr->peer->info->rank) { continue; diff --git a/src/event/pmix_event_registration.c b/src/event/pmix_event_registration.c index 134bece6ea..68bb7abb64 100644 --- a/src/event/pmix_event_registration.c +++ b/src/event/pmix_event_registration.c @@ -325,20 +325,22 @@ static pmix_status_t _add_hdlr(pmix_rshift_caddy_t *cd, pmix_list_t *xfer) static void reg_event_hdlr(int sd, short args, void *cbdata) { - size_t index = 0, n; - pmix_status_t rc; pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)cbdata; + size_t index = 0, n, i; + pmix_status_t rc; pmix_event_hdlr_t *evhdlr, *ev; uint8_t location = PMIX_EVENT_ORDER_NONE; char *name = NULL, *locator = NULL; bool firstoverall=false, lastoverall=false; - bool found; + bool found, matched; pmix_list_t xfer; pmix_info_caddy_t *ixfer; void *cbobject = NULL; pmix_data_range_t range = PMIX_RANGE_UNDEF; pmix_proc_t *parray = NULL; size_t nprocs; + pmix_notify_caddy_t *ncd; + pmix_event_chain_t *chain; pmix_output_verbose(2, pmix_globals.debug_output, "pmix: register event_hdlr with %d infos", (int)cd->ninfo); @@ -672,6 +674,66 @@ static void reg_event_hdlr(int sd, short args, void *cbdata) cd->evregcbfn(rc, index, cd->cbdata); } + /* check if any matching notifications have been cached */ + for (i=0; i < (size_t)pmix_globals.notifications.size; i++) { + if (NULL == (ncd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_globals.notifications, i))) { + break; + } + found = false; + if (NULL == cd->codes) { + /* they registered a default event handler - always matches */ + found = true; + } else { + for (n=0; n < cd->ncodes; n++) { + if (cd->codes[n] == ncd->status) { + found = true; + break; + } + } + } + if (found) { + /* if we were given specific targets, check if we are one */ + if (NULL != ncd->targets) { + matched = false; + for (n=0; n < ncd->ntargets; n++) { + if (0 != strncmp(pmix_globals.myid.nspace, ncd->targets[n].nspace, PMIX_MAX_NSLEN)) { + continue; + } + if (PMIX_RANK_WILDCARD == ncd->targets[n].rank || + pmix_globals.myid.rank == ncd->targets[n].rank) { + matched = true; + break; + } + } + if (!matched) { + /* do not notify this one */ + continue; + } + } + /* all matches - notify */ + chain = PMIX_NEW(pmix_event_chain_t); + chain->status = ncd->status; + (void)strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); + chain->source.rank = pmix_globals.myid.rank; + /* we already left space for evhandler name plus + * a callback object when we cached the notification */ + chain->ninfo = ncd->ninfo; + PMIX_INFO_CREATE(chain->info, chain->ninfo); + if (0 < cd->ninfo) { + /* need to copy the info */ + for (n=0; n < ncd->ninfo; n++) { + PMIX_INFO_XFER(&chain->info[n], &ncd->info[n]); + } + } + /* we don't want this chain to propagate, so indicate it + * should only be run as a single-shot */ + chain->endchain = true; + /* now notify any matching registered callbacks we have */ + pmix_invoke_local_event_hdlr(chain); + } + } + + /* all done */ PMIX_RELEASE(cd); } diff --git a/src/include/pmix_globals.h b/src/include/pmix_globals.h index 1333cb24f1..300ea224dd 100644 --- a/src/include/pmix_globals.h +++ b/src/include/pmix_globals.h @@ -36,6 +36,7 @@ #include "src/buffer_ops/types.h" #include "src/class/pmix_hash_table.h" #include "src/class/pmix_list.h" +#include "src/class/pmix_ring_buffer.h" #include "src/event/pmix_event.h" #include "src/mca/psec/psec.h" @@ -358,21 +359,22 @@ PMIX_CLASS_DECLARATION(pmix_info_caddy_t); * between various parts of the code library. Both the client * and server libraries must instance this structure */ typedef struct { - int init_cntr; // #times someone called Init - #times called Finalize + int init_cntr; // #times someone called Init - #times called Finalize pmix_proc_t myid; - pmix_peer_t *mypeer; // my own peer object + pmix_peer_t *mypeer; // my own peer object pmix_proc_type_t proc_type; - uid_t uid; // my effective uid - gid_t gid; // my effective gid + uid_t uid; // my effective uid + gid_t gid; // my effective gid int pindex; pmix_event_base_t *evbase; bool external_evbase; int debug_output; - pmix_events_t events; // my event handler registrations. + pmix_events_t events; // my event handler registrations. bool connected; - pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about - pmix_buffer_t *cache_local; // data PUT by me to local scope - pmix_buffer_t *cache_remote; // data PUT by me to remote scope + pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about + pmix_buffer_t *cache_local; // data PUT by me to local scope + pmix_buffer_t *cache_remote; // data PUT by me to remote scope + pmix_ring_buffer_t notifications; // ring buffer of pending notifications } pmix_globals_t; diff --git a/src/server/pmix_server.c b/src/server/pmix_server.c index 470ac4b20b..9bdbe66c87 100644 --- a/src/server/pmix_server.c +++ b/src/server/pmix_server.c @@ -105,6 +105,10 @@ static pmix_status_t initialize_server_base(pmix_server_module_t *module) pmix_globals.myid.rank = strtol(evar, NULL, 10); } + /* construct the global notification ring buffer */ + PMIX_CONSTRUCT(&pmix_globals.notifications, pmix_ring_buffer_t); + pmix_ring_buffer_init(&pmix_globals.notifications, 256); + /* setup the server-specific globals */ PMIX_CONSTRUCT(&pmix_server_globals.clients, pmix_pointer_array_t); pmix_pointer_array_init(&pmix_server_globals.clients, 1, INT_MAX, 1); @@ -113,8 +117,6 @@ static pmix_status_t initialize_server_base(pmix_server_module_t *module) PMIX_CONSTRUCT(&pmix_server_globals.gdata, pmix_buffer_t); PMIX_CONSTRUCT(&pmix_server_globals.events, pmix_list_t); PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t); - PMIX_CONSTRUCT(&pmix_server_globals.notifications, pmix_ring_buffer_t); - pmix_ring_buffer_init(&pmix_server_globals.notifications, 256); pmix_output_verbose(2, pmix_globals.debug_output, "pmix:server init called"); @@ -261,7 +263,7 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void) PMIX_LIST_DESTRUCT(&pmix_server_globals.remote_pnd); PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs); PMIX_DESTRUCT(&pmix_server_globals.gdata); - PMIX_DESTRUCT(&pmix_server_globals.notifications); + PMIX_DESTRUCT(&pmix_globals.notifications); PMIX_LIST_DESTRUCT(&pmix_server_globals.events); if (NULL != security_mode) { diff --git a/src/server/pmix_server_ops.c b/src/server/pmix_server_ops.c index 5add656abf..97fdd7cdfe 100644 --- a/src/server/pmix_server_ops.c +++ b/src/server/pmix_server_ops.c @@ -1160,8 +1160,8 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, check: /* check if any matching notifications have been cached */ - for (i=0; i < pmix_server_globals.notifications.size; i++) { - if (NULL == (cd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_server_globals.notifications, i))) { + for (i=0; i < pmix_globals.notifications.size; i++) { + if (NULL == (cd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_globals.notifications, i))) { break; } found = false; diff --git a/src/server/pmix_server_ops.h b/src/server/pmix_server_ops.h index f502cd33a3..f978e058b3 100644 --- a/src/server/pmix_server_ops.h +++ b/src/server/pmix_server_ops.h @@ -111,7 +111,6 @@ typedef struct { pmix_list_t local_reqs; // list of pmix_dmdx_local_t awaiting arrival of data from local neighbours pmix_buffer_t gdata; // cache of data given to me for passing to all clients pmix_list_t events; // list of pmix_regevents_info_t registered events - pmix_ring_buffer_t notifications; // ring buffer of pending notifications bool tool_connections_allowed; } pmix_server_globals_t; diff --git a/src/util/error.c b/src/util/error.c index d75bc2cd78..29ee09f129 100644 --- a/src/util/error.c +++ b/src/util/error.c @@ -167,6 +167,8 @@ PMIX_EXPORT const char* PMIx_Error_string(pmix_status_t errnum) return "PMIX HEARTBEAT ALERT"; case PMIX_MONITOR_FILE_ALERT: return "PMIX FILE MONITOR ALERT"; + case PMIX_MODEL_DECLARED: + return "PMIX MODEL DECLARED"; case PMIX_SUCCESS: return "SUCCESS"; default: