Skip to content

Commit

Permalink
Merge pull request #3696 from rhc54/topic/pmix3
Browse files Browse the repository at this point in the history
Update PMIx and integration glue
  • Loading branch information
Ralph Castain authored Jun 20, 2017
2 parents 1f291c8 + 952726c commit 814e858
Show file tree
Hide file tree
Showing 62 changed files with 2,436 additions and 1,649 deletions.
9 changes: 4 additions & 5 deletions ompi/interlib/interlib.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ int ompi_interlib_declare(int threadlevel, char *version)
}
opal_list_append(&info, &kv->super);
/* call pmix to initialize these values */
if (OPAL_SUCCESS != (ret = opal_pmix.init(&info))) {
OPAL_LIST_DESTRUCT(&info);
return ret;
}
ret = opal_pmix.init(&info);
OPAL_LIST_DESTRUCT(&info);
return OMPI_SUCCESS;
/* account for our refcount on pmix_init */
opal_pmix.finalize();
return ret;
}
3 changes: 0 additions & 3 deletions ompi/runtime/ompi_mpi_finalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ int ompi_mpi_finalize(void)
}
}

/* account for our refcount on pmix_init */
opal_pmix.finalize();

/* check for timing request - get stop time and report elapsed
time if so */
//OPAL_TIMING_DELTAS(ompi_enable_timing, &tm);
Expand Down
122 changes: 121 additions & 1 deletion opal/mca/pmix/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include "opal_config.h"
#include "opal/types.h"

#include "opal/threads/threads.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_framework.h"

Expand Down Expand Up @@ -55,13 +55,133 @@ OPAL_DECLSPEC int opal_pmix_base_exchange(opal_value_t *info,

OPAL_DECLSPEC void opal_pmix_base_set_evbase(opal_event_base_t *evbase);

#define opal_pmix_condition_wait(a,b) pthread_cond_wait(a, &(b)->m_lock_pthread)
typedef pthread_cond_t opal_pmix_condition_t;
#define opal_pmix_condition_broadcast(a) pthread_cond_broadcast(a)
#define opal_pmix_condition_signal(a) pthread_cond_signal(a)
#define OPAL_PMIX_CONDITION_STATIC_INIT PTHREAD_COND_INITIALIZER

typedef struct {
opal_mutex_t mutex;
opal_pmix_condition_t cond;
volatile bool active;
} opal_pmix_lock_t;


typedef struct {
opal_event_base_t *evbase;
int timeout;
int initialized;
opal_pmix_lock_t lock;
} opal_pmix_base_t;

extern opal_pmix_base_t opal_pmix_base;

#define OPAL_PMIX_CONSTRUCT_LOCK(l) \
do { \
OBJ_CONSTRUCT(&(l)->mutex, opal_mutex_t); \
pthread_cond_init(&(l)->cond, NULL); \
(l)->active = true; \
} while(0)

#define OPAL_PMIX_DESTRUCT_LOCK(l) \
do { \
OBJ_DESTRUCT(&(l)->mutex); \
pthread_cond_destroy(&(l)->cond); \
} while(0)


#if OPAL_ENABLE_DEBUG
#define OPAL_PMIX_ACQUIRE_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
if (opal_debug_threads) { \
opal_output(0, "Waiting for thread %s:%d", \
__FILE__, __LINE__); \
} \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
if (opal_debug_threads) { \
opal_output(0, "Thread obtained %s:%d", \
__FILE__, __LINE__); \
} \
(lck)->active = true; \
} while(0)
#else
#define OPAL_PMIX_ACQUIRE_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
(lck)->active = true; \
} while(0)
#endif


#if OPAL_ENABLE_DEBUG
#define OPAL_PMIX_WAIT_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
if (opal_debug_threads) { \
opal_output(0, "Waiting for thread %s:%d", \
__FILE__, __LINE__); \
} \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
if (opal_debug_threads) { \
opal_output(0, "Thread obtained %s:%d", \
__FILE__, __LINE__); \
} \
OPAL_ACQUIRE_OBJECT(&lck); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#else
#define OPAL_PMIX_WAIT_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
OPAL_ACQUIRE_OBJECT(lck); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#endif


#if OPAL_ENABLE_DEBUG
#define OPAL_PMIX_RELEASE_THREAD(lck) \
do { \
if (opal_debug_threads) { \
opal_output(0, "Releasing thread %s:%d", \
__FILE__, __LINE__); \
} \
(lck)->active = false; \
opal_pmix_condition_broadcast(&(lck)->cond); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#else
#define OPAL_PMIX_RELEASE_THREAD(lck) \
do { \
assert(0 != opal_mutex_trylock(&(lck)->mutex)); \
(lck)->active = false; \
opal_pmix_condition_broadcast(&(lck)->cond); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#endif


#define OPAL_PMIX_WAKEUP_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
(lck)->active = false; \
OPAL_POST_OBJECT(lck); \
opal_pmix_condition_broadcast(&(lck)->cond); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)

END_C_DECLS

#endif
99 changes: 11 additions & 88 deletions opal/mca/pmix/base/pmix_base_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,39 +92,6 @@ int opal_pmix_base_notify_event(int status,
return OPAL_SUCCESS;
}

struct lookup_caddy_t {
volatile bool active;
int status;
opal_pmix_pdata_t *pdat;
};

/******** DATA EXCHANGE ********/
static void lookup_cbfunc(int status, opal_list_t *data, void *cbdata)
{
struct lookup_caddy_t *cd = (struct lookup_caddy_t*)cbdata;
cd->status = status;
if (OPAL_SUCCESS == status && NULL != data) {
opal_pmix_pdata_t *p = (opal_pmix_pdata_t*)opal_list_get_first(data);
if (NULL != p) {
cd->pdat->proc = p->proc;
if (p->value.type == cd->pdat->value.type) {
if (NULL != cd->pdat->value.key) {
free(cd->pdat->value.key);
}
(void)opal_value_xfer(&cd->pdat->value, &p->value);
}
}
}
cd->active = false;
}

static void opcbfunc(int status, void *cbdata)
{
struct lookup_caddy_t *cd = (struct lookup_caddy_t*)cbdata;
cd->status = status;
cd->active = false;
}

int opal_pmix_base_exchange(opal_value_t *indat,
opal_pmix_pdata_t *outdat,
int timeout)
Expand All @@ -133,8 +100,6 @@ int opal_pmix_base_exchange(opal_value_t *indat,
opal_list_t ilist, mlist;
opal_value_t *info;
opal_pmix_pdata_t *pdat;
struct lookup_caddy_t caddy;
char **keys;

/* protect the incoming value */
opal_dss.copy((void**)&info, indat, OPAL_VALUE);
Expand All @@ -148,29 +113,10 @@ int opal_pmix_base_exchange(opal_value_t *indat,
opal_list_append(&ilist, &info->super);

/* publish it with "session" scope */
if (NULL == opal_pmix.publish_nb) {
rc = opal_pmix.publish(&ilist);
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
return rc;
}
} else {
caddy.status = -1;
caddy.active = true;
caddy.pdat = NULL;
rc = opal_pmix.publish_nb(&ilist, opcbfunc, &caddy);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_DESTRUCT(&ilist);
return rc;
}
while (caddy.active) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != caddy.status) {
OPAL_ERROR_LOG(caddy.status);
return caddy.status;
}
rc = opal_pmix.publish(&ilist);
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
return rc;
}

/* lookup the other side's info - if a non-blocking form
Expand Down Expand Up @@ -204,43 +150,20 @@ int opal_pmix_base_exchange(opal_value_t *indat,

/* if a non-blocking version of lookup isn't
* available, then use the blocking version */
if (NULL == opal_pmix.lookup_nb) {
OBJ_CONSTRUCT(&ilist, opal_list_t);
opal_list_append(&ilist, &pdat->super);
rc = opal_pmix.lookup(&ilist, &mlist);
OPAL_LIST_DESTRUCT(&mlist);
OBJ_CONSTRUCT(&ilist, opal_list_t);
opal_list_append(&ilist, &pdat->super);
rc = opal_pmix.lookup(&ilist, &mlist);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
return rc;
}
} else {
caddy.status = -1;
caddy.active = true;
caddy.pdat = pdat;
keys = NULL;
opal_argv_append_nosize(&keys, pdat->value.key);
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(keys);
return rc;
}
while (caddy.active) {
usleep(10);
}
opal_argv_free(keys);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != caddy.status) {
OPAL_ERROR_LOG(caddy.status);
return caddy.status;
}
return rc;
}

/* pass back the result */
outdat->proc = pdat->proc;
free(outdat->value.key);
rc = opal_value_xfer(&outdat->value, &pdat->value);
OBJ_RELEASE(pdat);
OPAL_LIST_DESTRUCT(&ilist);
return rc;
}

Expand Down
12 changes: 11 additions & 1 deletion opal/mca/pmix/base/pmix_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "opal/constants.h"

#include "opal/mca/mca.h"
#include "opal/threads/thread_usage.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/mca/base/base.h"
Expand All @@ -35,7 +36,16 @@ opal_pmix_base_module_t opal_pmix = { 0 };
bool opal_pmix_collect_all_data = true;
int opal_pmix_verbose_output = -1;
bool opal_pmix_base_async_modex = false;
opal_pmix_base_t opal_pmix_base = {0};
opal_pmix_base_t opal_pmix_base = {
.evbase = NULL,
.timeout = 0,
.initialized = 0,
.lock = {
.mutex = OPAL_MUTEX_STATIC_INIT,
.cond = OPAL_PMIX_CONDITION_STATIC_INIT,
.active = false
}
};

static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
{
Expand Down
2 changes: 1 addition & 1 deletion opal/mca/pmix/pmix.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ extern int opal_pmix_base_exchange(opal_value_t *info,
OPAL_NAME_PRINT(*(p)), (s))); \
OBJ_CONSTRUCT(&(_ilist), opal_list_t); \
_info = OBJ_NEW(opal_value_t); \
_info->key = strdup(OPAL_PMIX_OPTIONAL); \
_info->key = strdup(OPAL_PMIX_IMMEDIATE); \
_info->type = OPAL_BOOL; \
_info->data.flag = true; \
opal_list_append(&(_ilist), &(_info)->super); \
Expand Down
15 changes: 12 additions & 3 deletions opal/mca/pmix/pmix2x/pmix/AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,31 @@ Email Name Affiliation(s)
alinask Elena Shipunova Mellanox
annu13 Annapurna Dasari Intel
artpol84 Artem Polyakov Mellanox
ashleypittman Ashley Pittman Intel
dsolt Dave Solt IBM
garlick Jim Garlick LLNL
ggouaillardet Gilles Gouaillardet RIST
hjelmn Nathan Hjelm LANL
igor-ivanov Igor Ivanov Mellanox
jladd-mlnx Joshua Ladd Mellanox
jsquyres Jeff Squyres Cisco, IU
jjhursey Joshua Hursey IBM
jsquyres Jeff Squyres Cisco
karasevb Boris Karasev Mellanox
kawashima-fj Takahiro Kawashima Fujitsu
nkogteva Nadezhda Kogteva Mellanox
rhc54 Ralph Castain LANL, Cisco, Intel
nysal Nysal Jan KA IBM
PHHargrove Paul Hargrove LBNL
rhc54 Ralph Castain Intel
------------------------------- --------------------------- -------------------

Affiliation abbreviations:
--------------------------
Cisco = Cisco Systems, Inc.
Fujitsu = Fujitsu
IBM = International Business Machines, Inc.
Intel = Intel, Inc.
IU = Indiana University
LANL = Los Alamos National Laboratory
LBNL = Lawrence Berkeley National Laboratory
LLNL = Lawrence Livermore National Laboratory
Mellanox = Mellanox
RIST = Research Organization for Information Science and Technology
Loading

0 comments on commit 814e858

Please sign in to comment.