Skip to content

Commit

Permalink
Merge pull request #2493 from rhc54/topic/pmix
Browse files Browse the repository at this point in the history
Update to latest PMIx master
  • Loading branch information
rhc54 authored Dec 2, 2016
2 parents 88fbdf8 + 6041467 commit b2e36f0
Show file tree
Hide file tree
Showing 22 changed files with 661 additions and 293 deletions.
6 changes: 3 additions & 3 deletions opal/mca/pmix/pmix3x/pmix/VERSION
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# major, minor, and release are generally combined in the form
# <major>.<minor>.<release>.

major=3
major=2
minor=0
release=0

Expand All @@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".

repo_rev=gitb041846
repo_rev=git211a0ef

# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
Expand All @@ -44,7 +44,7 @@ tarball_version=

# The date when this release was created

date="Oct 27, 2016"
date="Dec 01, 2016"

# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library
Expand Down
1 change: 0 additions & 1 deletion opal/mca/pmix/pmix3x/pmix/config/pmix.m4
Original file line number Diff line number Diff line change
Expand Up @@ -926,4 +926,3 @@ AC_DEFUN([PMIX_DO_AM_CONDITIONALS],[
])
pmix_did_am_conditionals=yes
])dnl

176 changes: 22 additions & 154 deletions opal/mca/pmix/pmix3x/pmix/src/client/pmix_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
#endif /* PMIX_ENABLE_DSTORE */

#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"

#define PMIX_MAX_RETRIES 10

Expand Down Expand Up @@ -191,8 +192,11 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
}
assert(NULL != nspace);
free(nspace);

/* decode it */
pmix_client_process_nspace_blob(pmix_globals.myid.nspace, buf);
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
pmix_job_data_htable_store(pmix_globals.myid.nspace, buf);
#endif
cb->status = PMIX_SUCCESS;
cb->active = false;
}
Expand Down Expand Up @@ -715,12 +719,27 @@ static void _peersfn(int sd, short args, void *cbdata)
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
pmix_status_t rc;
char **nsprocs=NULL, **nsps=NULL, **tmp;
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
pmix_nspace_t *nsptr;
pmix_nrec_t *nptr;
#endif
size_t i;

/* cycle across our known nspaces */
tmp = NULL;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(cb->nspace, PMIX_RANK_WILDCARD,
cb->key, &cb->value))) {

tmp = pmix_argv_split(cb->value->data.string, ',');
for (i=0; NULL != tmp[i]; i++) {
pmix_argv_append_nosize(&nsps, cb->nspace);
pmix_argv_append_nosize(&nsprocs, tmp[i]);
}
pmix_argv_free(tmp);
tmp = NULL;
}
#else
PMIX_LIST_FOREACH(nsptr, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strncmp(nsptr->nspace, cb->nspace, PMIX_MAX_NSLEN)) {
/* cycle across the nodes in this nspace */
Expand All @@ -738,6 +757,7 @@ static void _peersfn(int sd, short args, void *cbdata)
}
}
}
#endif
if (0 == (i = pmix_argv_count(nsps))) {
/* we don't know this nspace */
rc = PMIX_ERR_NOT_FOUND;
Expand Down Expand Up @@ -1010,160 +1030,8 @@ static pmix_status_t send_connect_ack(int sd)
return PMIX_SUCCESS;
}

void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr)
static pmix_status_t usock_connect(struct sockaddr *addr, int *fd)
{
pmix_status_t rc;
int32_t cnt;
int rank;
pmix_kval_t *kptr, *kp2, kv;
pmix_buffer_t buf2;
pmix_byte_object_t *bo;
size_t nnodes, i, j;
pmix_nspace_t *nsptr, *nsptr2;
pmix_nrec_t *nrec, *nr2;
char **procs;

pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: PROCESSING BLOB FOR NSPACE %s", nspace);

/* cycle across our known nspaces */
nsptr = NULL;
PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(nsptr2->nspace, nspace)) {
nsptr = nsptr2;
break;
}
}
if (NULL == nsptr) {
/* we don't know this nspace - add it */
nsptr = PMIX_NEW(pmix_nspace_t);
(void)strncpy(nsptr->nspace, nspace, PMIX_MAX_NSLEN);
pmix_list_append(&pmix_globals.nspaces, &nsptr->super);
}

/* unpack any info structs provided */
cnt = 1;
kptr = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(bptr, kptr, &cnt, PMIX_KVAL))) {
if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) {
/* transfer the byte object for unpacking */
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size);
/* start by unpacking the rank */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &rank, &cnt, PMIX_PROC_RANK))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
return;
}
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_RANK);
PMIX_VALUE_CREATE(kp2->value, 1);
kp2->value->type = PMIX_PROC_RANK;
kp2->value->data.rank = rank;
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp2); // maintain accounting
cnt = 1;
kp2 = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(&buf2, kp2, &cnt, PMIX_KVAL))) {
/* this is data provided by a job-level exchange, so store it
* in the job-level data hash_table */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp2); // maintain accounting
kp2 = PMIX_NEW(pmix_kval_t);
}
/* cleanup */
PMIX_DESTRUCT(&buf2); // releases the original kptr data
PMIX_RELEASE(kp2);
} else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) {
/* transfer the byte object for unpacking */
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size);
/* start by unpacking the number of nodes */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &nnodes, &cnt, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
return;
}
/* unpack the list of procs on each node */
for (i=0; i < nnodes; i++) {
cnt = 1;
PMIX_CONSTRUCT(&kv, pmix_kval_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &kv, &cnt, PMIX_KVAL))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
PMIX_DESTRUCT(&kv);
return;
}
/* the name of the node is in the key, and the value is
* a comma-delimited list of procs on that node. See if we already
* have this node */
nrec = NULL;
PMIX_LIST_FOREACH(nr2, &nsptr->nodes, pmix_nrec_t) {
if (0 == strcmp(nr2->name, kv.key)) {
nrec = nr2;
break;
}
}
if (NULL == nrec) {
/* Create a node record and store that list */
nrec = PMIX_NEW(pmix_nrec_t);
nrec->name = strdup(kv.key);
pmix_list_append(&nsptr->nodes, &nrec->super);
} else {
/* refresh the list */
if (NULL != nrec->procs) {
free(nrec->procs);
}
}
nrec->procs = strdup(kv.value->data.string);
/* split the list of procs so we can store their
* individual location data */
procs = pmix_argv_split(nrec->procs, ',');
for (j=0; NULL != procs[j]; j++) {
/* store the hostname for each proc - again, this is
* data obtained via a job-level exchange, so store it
* in the job-level data hash_table */
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_HOSTNAME);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
kp2->value->type = PMIX_STRING;
kp2->value->data.string = strdup(nrec->name);
rank = strtol(procs[j], NULL, 10);
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp2); // maintain accounting
}
pmix_argv_free(procs);
PMIX_DESTRUCT(&kv);
}
/* cleanup */
PMIX_DESTRUCT(&buf2); // releases the original kptr data
} else {
/* this is job-level data, so just add it to that hash_table
* with the wildcard rank */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, PMIX_RANK_WILDCARD, kptr))) {
PMIX_ERROR_LOG(rc);
}
}
PMIX_RELEASE(kptr);
kptr = PMIX_NEW(pmix_kval_t);
cnt = 1;
}
/* need to release the leftover kptr */
PMIX_RELEASE(kptr);
}

static pmix_status_t usock_connect(struct sockaddr *addr, int *fd)
{
int sd=-1;
pmix_status_t rc;
pmix_socklen_t addrlen = 0;
Expand Down
5 changes: 4 additions & 1 deletion opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "src/usock/usock.h"

#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"

/* callback for wait completion */
static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
Expand Down Expand Up @@ -313,7 +314,9 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
continue;
}
/* extract and process any proc-related info for this nspace */
pmix_client_process_nspace_blob(nspace, bptr);
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
pmix_job_data_htable_store(nspace, bptr);
#endif
PMIX_RELEASE(bptr);
}
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
Expand Down
35 changes: 32 additions & 3 deletions opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <pmix_rename.h>

#include "src/include/pmix_globals.h"
#include "src/include/pmix_jobdata.h"

#ifdef HAVE_STRING_H
#include <string.h>
Expand Down Expand Up @@ -58,6 +59,7 @@
#endif /* PMIX_ENABLE_DSTORE */

#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"

static pmix_buffer_t* _pack_get(char *nspace, pmix_rank_t rank,
const pmix_info_t info[], size_t ninfo,
Expand Down Expand Up @@ -283,8 +285,12 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
goto done;
}

#if (PMIX_ENABLE_DSTORE == 1)
rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val);
#if (defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
if (PMIX_SUCCESS != (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val))){
/* DO NOT error log this status - it is perfectly okay
* for a key not to be found */
goto done;
}
#else
/* we received the entire blob for this process, so
* unpack and store it in the modex - this could consist
Expand All @@ -308,7 +314,12 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
return;
}
free(nspace);
pmix_client_process_nspace_blob(cb->nspace, bptr);
pmix_job_data_htable_store(cb->nspace, bptr);

/* Check if the key is in this blob */

pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val);

} else {
cnt = 1;
cur_kval = PMIX_NEW(pmix_kval_t);
Expand Down Expand Up @@ -550,11 +561,29 @@ static void _getnbfn(int fd, short flags, void *cbdata)
return;
}

#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) {
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_RELEASE(cb);
return;
}
#endif

/* if the key is in the PMIx namespace, then they are looking for data
* that was provided at startup */
if (0 == strncmp(cb->key, "pmix", 4)) {
/* should be in the internal hash table. */
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(cb->nspace, cb->rank, cb->key, &val))) {
#else
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) {
#endif
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);
Expand Down
3 changes: 0 additions & 3 deletions opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ typedef struct {

extern pmix_client_globals_t pmix_client_globals;

void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr);


END_C_DECLS

#endif /* PMIX_CLIENT_OPS_H */
11 changes: 8 additions & 3 deletions opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_spawn.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "src/usock/usock.h"

#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"

static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
pmix_buffer_t *buf, void *cbdata);
Expand Down Expand Up @@ -179,7 +180,7 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
{
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
char nspace[PMIX_MAX_NSLEN+1];
char *n2;
char *n2 = NULL;
pmix_status_t rc, ret;
int32_t cnt;

Expand All @@ -203,10 +204,15 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
PMIX_ERROR_LOG(rc);
ret = rc;
}
pmix_output_verbose(1, pmix_globals.debug_output,
"pmix:client recv '%s'", n2);

if (NULL != n2) {
(void)strncpy(nspace, n2, PMIX_MAX_NSLEN);
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
/* extract and process any proc-related info for this nspace */
pmix_client_process_nspace_blob(nspace, buf);
pmix_job_data_htable_store(nspace, buf);
#endif
free(n2);
}
}
Expand All @@ -226,4 +232,3 @@ static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata)
}
cb->active = false;
}

Loading

0 comments on commit b2e36f0

Please sign in to comment.