diff --git a/src/mca/plm/base/plm_base_frame.c b/src/mca/plm/base/plm_base_frame.c index ad7a2138d0..b154006e18 100644 --- a/src/mca/plm/base/plm_base_frame.c +++ b/src/mca/plm/base/plm_base_frame.c @@ -15,7 +15,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2018-2019 Intel, Inc. All rights reserved. * Copyright (c) 2020 Cisco Systems, Inc. All rights reserved - * Copyright (c) 2021 Nanook Consulting. All rights reserved. + * Copyright (c) 2021-2022 Nanook Consulting. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -116,6 +116,8 @@ static int prte_plm_base_close(void) if (NULL != prte_plm_globals.base_nspace) { free(prte_plm_globals.base_nspace); } + while (NULL != pmix_list_remove_first(&prte_plm_globals.daemon_cache)); // do not release list items! + PMIX_DESTRUCT(&prte_plm_globals.daemon_cache); return prte_mca_base_framework_components_close(&prte_plm_base_framework, NULL); } @@ -132,6 +134,8 @@ static int prte_plm_base_open(prte_mca_base_open_flag_t flags) /* default to assigning daemons to nodes at launch */ prte_plm_globals.daemon_nodes_assigned_at_launch = true; + PMIX_CONSTRUCT(&prte_plm_globals.daemon_cache, pmix_list_t); + /* Open up all available components */ return prte_mca_base_framework_components_open(&prte_plm_base_framework, flags); } diff --git a/src/mca/plm/base/plm_base_launch_support.c b/src/mca/plm/base/plm_base_launch_support.c index bf4bd3f22d..ae64d533f4 100644 --- a/src/mca/plm/base/plm_base_launch_support.c +++ b/src/mca/plm/base/plm_base_launch_support.c @@ -1352,7 +1352,7 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu char *ptr; int idx; pmix_status_t ret; - prte_proc_t *daemon = NULL; + prte_proc_t *daemon = NULL, *dptr; prte_job_t *jdata; pmix_proc_t dname; pmix_data_buffer_t *relay; @@ -1376,6 +1376,8 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu pmix_data_buffer_t datbuf, *data; pmix_topology_t ptopo; pmix_value_t cnctinfo; + bool daemon1_has_reported = false; + PRTE_HIDE_UNUSED_PARAMS(status, sender, tag, cbdata); /* get the daemon job, if necessary */ @@ -1532,6 +1534,7 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu /* rank=1 always sends its topology back */ topo = NULL; if (1 == dname.rank) { + daemon1_has_reported = true; PMIX_DATA_BUFFER_CONSTRUCT(&datbuf); /* unpack the flag to see if this payload is compressed */ idx = 1; @@ -1552,6 +1555,7 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu /* only need to process it if our signatures differ */ if (0 == strcmp(sig, mytopo->sig)) { PMIX_BYTE_OBJECT_DESTRUCT(&pbo); + topo = mytopo->topo; } else { if (compressed) { /* decompress the data */ @@ -1597,14 +1601,35 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu topo = ptopo.topology; ptopo.topology = NULL; PMIX_TOPOLOGY_DESTRUCT(&ptopo); - /* update the node's available processors */ - if (NULL != daemon->node->available) { - hwloc_bitmap_free(daemon->node->available); - } - daemon->node->available = prte_hwloc_base_filter_cpus(topo); /* cleanup */ PMIX_DATA_BUFFER_DESTRUCT(data); } + /* process any cached daemons */ + while (NULL != (dptr = (prte_proc_t*)pmix_list_remove_first(&prte_plm_globals.daemon_cache))) { + if (0 == strcmp(dptr->node->topology->sig, sig)) { + dptr->node->available = prte_hwloc_base_filter_cpus(topo); + jdatorted->num_reported++; + } else { + /* we need to request this topology */ + PMIX_DATA_BUFFER_CREATE(relay); + cmd = PRTE_DAEMON_REPORT_TOPOLOGY_CMD; + ret = PMIx_Data_pack(NULL, relay, &cmd, 1, PMIX_UINT8); + if (PMIX_SUCCESS != ret) { + PMIX_ERROR_LOG(ret); + PMIX_DATA_BUFFER_RELEASE(relay); + prted_failed_launch = true; + goto CLEANUP; + } + /* send it */ + PRTE_RML_SEND(ret, dptr->name.rank, relay, PRTE_RML_TAG_DAEMON); + if (PRTE_SUCCESS != ret) { + PRTE_ERROR_LOG(ret); + PMIX_DATA_BUFFER_RELEASE(relay); + prted_failed_launch = true; + goto CLEANUP; + } + } + } } /* see if they provided their inventory */ @@ -1666,7 +1691,10 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu } } - /* do we already have this topology from some other node? */ + /* do we already have this topology from some other node? + * NOTE: if this is daemon1, then topo will NOT be NULL - it + * will either point to mytopo or to the daemon1 topo. If it + * is NOT daemon1, then it will be NULL */ found = false; for (i = 0; i < prte_node_topologies->size; i++) { t = (prte_topology_t *) pmix_pointer_array_get_item(prte_node_topologies, i); @@ -1676,32 +1704,50 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu /* just check the signature */ if (0 == strcmp(sig, t->sig)) { PRTE_OUTPUT_VERBOSE((5, prte_plm_base_framework.framework_output, - "%s TOPOLOGY ALREADY RECORDED", + "%s TOPOLOGY SIGNATURE ALREADY RECORDED", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); - found = true; daemon->node->topology = t; - daemon->node->available = prte_hwloc_base_filter_cpus(t->topo); - if (NULL != topo) { + /* the topology in this struct can be NULL in the case + * where an earlier daemon other than daemon1 reported the + * signature but did not include its topology */ + if (NULL == t->topo) { + if (NULL == topo) { + /* must not be from daemon1 - treat as not found */ + break; + } + /* daemon1 would have included the topology, so we + * can pick it up here */ + t->topo = topo; + } else if (NULL != topo && topo != mytopo->topo) { + /* we already have the topology */ hwloc_topology_destroy(topo); } + /* update the node's available processors */ + if (NULL != daemon->node->available) { + hwloc_bitmap_free(daemon->node->available); + } + daemon->node->available = prte_hwloc_base_filter_cpus(t->topo); free(sig); + found = true; break; } } if (!found) { - /* nope - save the signature and request the complete topology from that node */ + /* nope - save the signature */ PRTE_OUTPUT_VERBOSE((5, prte_plm_base_framework.framework_output, - "%s NEW TOPOLOGY - ADDING", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); + "%s NEW TOPOLOGY - ADDING", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); t = PMIX_NEW(prte_topology_t); t->sig = sig; t->index = pmix_pointer_array_add(prte_node_topologies, t); daemon->node->topology = t; - if (NULL != topo) { - /* Apply any CPU filters (not preserved by the XML) */ - daemon->node->available = prte_hwloc_base_filter_cpus(topo); - t->topo = topo; + /* if daemon1 has not reported, then cache this daemon + * for later processing */ + if (!daemon1_has_reported) { + pmix_list_append(&prte_plm_globals.daemon_cache, &daemon->super); } else { + /* request the complete topology from that node */ PRTE_OUTPUT_VERBOSE((5, prte_plm_base_framework.framework_output, "%s REQUESTING TOPOLOGY FROM %s", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), PRTE_NAME_PRINT(&dname))); @@ -1723,15 +1769,15 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu prted_failed_launch = true; goto CLEANUP; } - /* we will count this node as completed - * when we get the full topology back */ - if (NULL != nodename) { - free(nodename); - nodename = NULL; - } - idx = 1; - continue; } + /* we will count this node as completed + * when we get the full topology back */ + if (NULL != nodename) { + free(nodename); + nodename = NULL; + } + idx = 1; + continue; } CLEANUP: @@ -1780,6 +1826,7 @@ void prte_plm_base_daemon_callback(int status, pmix_proc_t *sender, pmix_data_bu } idx = 1; } + if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) { PMIX_ERROR_LOG(ret); PRTE_ACTIVATE_JOB_STATE(jdatorted, PRTE_JOB_STATE_FAILED_TO_START); diff --git a/src/mca/plm/base/plm_private.h b/src/mca/plm/base/plm_private.h index 340d472215..26ac743193 100644 --- a/src/mca/plm/base/plm_private.h +++ b/src/mca/plm/base/plm_private.h @@ -64,6 +64,7 @@ typedef struct { /* daemon nodes assigned at launch */ bool daemon_nodes_assigned_at_launch; size_t node_regex_threshold; + pmix_list_t daemon_cache; } prte_plm_globals_t; /** * Global instance of PLM framework data