From 0e51c3ce2912eb1187d3d977278ebfd4621ace24 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Tue, 5 Jul 2016 18:30:11 +0200 Subject: [PATCH 01/10] Remove an apparently useless function. (cherry picked from commit 73972768f834505757bb43b1974ac237d4cf56a5) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_init.c | 8 -------- ompi/communicator/communicator.h | 1 - 2 files changed, 9 deletions(-) diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index b2200bdb71e..f2acab5bbec 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -341,14 +341,6 @@ int ompi_comm_finalize(void) return OMPI_SUCCESS; } -/* - * For linking only. To be checked. - */ -int ompi_comm_link_function(void) -{ - return OMPI_SUCCESS; -} - /********************************************************************************/ /********************************************************************************/ /********************************************************************************/ diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 57ef9977fce..455defb9cc5 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -351,7 +351,6 @@ static inline bool ompi_comm_peer_invalid(ompi_communicator_t* comm, int peer_id * Initialise MPI_COMM_WORLD and MPI_COMM_SELF */ int ompi_comm_init(void); -OMPI_DECLSPEC int ompi_comm_link_function(void); /** * extract the local group from a communicator From 94d6db096d3aa2e015ddd6233dfd5b5bc848a21a Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Fri, 8 Jul 2016 10:46:05 -0600 Subject: [PATCH 02/10] ompi/comm: refactor communicator cid code This commit simplifies the communicator context ID generation by removing the blocking code. The high level calls: ompi_comm_nextcid and ompi_comm_activate remain but now call the non-blocking variants and wait on the resulting request. This was done to remove the parallel paths for context ID generation in preperation for further improvements of the CID generation code. Signed-off-by: Nathan Hjelm (cherry picked from commit 035c2e2e2aefbd36a200a0a1c4262fde959de462) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm.c | 158 +-- ompi/communicator/comm_cid.c | 1594 ++++++++++++------------------ ompi/communicator/comm_init.c | 8 - ompi/communicator/comm_request.c | 5 +- ompi/communicator/comm_request.h | 2 +- ompi/communicator/communicator.h | 71 +- ompi/dpm/dpm.c | 30 +- ompi/mpi/c/intercomm_create.c | 22 +- ompi/mpi/c/intercomm_merge.c | 23 +- 9 files changed, 718 insertions(+), 1195 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index c1997d3841f..342e5028afe 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -358,13 +358,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -374,13 +368,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, newcomp->c_contextid, comm->c_contextid ); /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -609,13 +597,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -634,36 +616,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != sorted ) { - free ( sorted ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != rsorted ) { - free ( rsorted ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); - } + free ( results ); + free ( sorted ); + free ( rresults ); + free ( rsorted ); + free ( lranks ); + free ( rranks ); /* Step 4: if we are not part of the comm, free the struct */ /* --------------------------------------------------------- */ @@ -675,7 +636,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } *newcomm = newcomp; - return ( rc ); + return rc; } @@ -925,13 +886,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -950,13 +905,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -1031,13 +980,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1047,13 +990,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1062,11 +999,15 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp return MPI_SUCCESS; } -struct ompi_comm_idup_with_info_context { +struct ompi_comm_idup_with_info_context_t { + opal_object_t super; ompi_communicator_t *comm; ompi_communicator_t *newcomp; }; +typedef struct ompi_comm_idup_with_info_context_t ompi_comm_idup_with_info_context_t; +OBJ_CLASS_INSTANCE(ompi_comm_idup_with_info_context_t, opal_object_t, NULL, NULL); + static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request); static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request); static int ompi_comm_idup_getcid (ompi_comm_request_t *request); @@ -1085,7 +1026,7 @@ int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_group_t *remote_group, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req) { - struct ompi_comm_idup_with_info_context *context; + ompi_comm_idup_with_info_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq[1]; int rc; @@ -1101,7 +1042,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); + context = OBJ_NEW(ompi_comm_idup_with_info_context_t); if (NULL == context) { ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; @@ -1109,7 +1050,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro context->comm = comm; - request->context = context; + request->context = &context->super; rc = ompi_comm_set_nb (&context->newcomp, /* new comm */ comm, /* old comm */ @@ -1142,8 +1083,8 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro static int ompi_comm_idup_getcid (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1154,11 +1095,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */ - context->comm, /* old comm */ - NULL, /* bridge comm */ - mode, /* mode */ - subreq); /* new subrequest */ + rc = ompi_comm_nextcid_nb (context->newcomp, context->comm, NULL, NULL, + NULL, false, mode, subreq); if (OMPI_SUCCESS != rc) { ompi_comm_request_return (request); return rc; @@ -1171,8 +1109,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1187,7 +1125,7 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) context->newcomp->c_contextid, context->comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, subreq); + rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, NULL, NULL, false, mode, subreq); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1233,13 +1171,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - newcomp, /* bridge comm (used to pass the group into the group allreduce) */ - &tag, /* user defined tag */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1249,13 +1181,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - newcomp, - &tag, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1924,13 +1850,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, int ret = OMPI_SUCCESS; /* Determine context id. It is identical to f_2_c_handle */ - ret = ompi_comm_nextcid ( new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ + ret = ompi_comm_nextcid (new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; @@ -1953,15 +1874,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, goto complete_and_return; } - ret = ompi_comm_activate( &new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ - - + ret = ompi_comm_activate (&new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f0c332b5dc1..ab9c9961198 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -14,7 +14,7 @@ * Copyright (c) 2007 Voltaire All rights reserved. * Copyright (c) 2006-2010 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. @@ -44,7 +44,90 @@ #include "ompi/request/request.h" #include "ompi/runtime/mpiruntime.h" -BEGIN_C_DECLS +struct ompi_comm_cid_context_t; + +typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + struct ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); + + +struct ompi_comm_cid_context_t { + opal_object_t super; + + ompi_communicator_t *newcomm; + ompi_communicator_t **newcommp; + ompi_communicator_t *comm; + ompi_communicator_t *bridgecomm; + + ompi_comm_allreduce_impl_fn_t allreduce_fn; + + int nextcid; + int nextlocal_cid; + int start; + int flag, rflag; + int local_leader; + int remote_leader; + int iter; + /** storage for activate barrier */ + int ok; + char *port_string; + bool send_first; + int pml_tag; + char *pmix_tag; +}; + +typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t; + +static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context) +{ + free (context->port_string); + free (context->pmix_tag); +} + +OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t, + mca_comm_cid_context_construct, + mca_comm_cid_context_destruct); + +struct ompi_comm_allreduce_context_t { + opal_object_t super; + + int *inbuf; + int *outbuf; + int count; + struct ompi_op_t *op; + ompi_comm_cid_context_t *cid_context; + int *tmpbuf; + + /* for intercomm allreduce */ + int *rcounts; + int *rdisps; + + /* for group allreduce */ + int peers_comm[3]; +}; + +typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t; + +static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) +{ + free (context->tmpbuf); + free (context->rcounts); + free (context->rdisps); +} + +OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, + ompi_comm_allreduce_context_construct, + ompi_comm_allreduce_context_destruct); /** * These functions make sure, that we determine the global result over @@ -53,90 +136,29 @@ BEGIN_C_DECLS * and a bridge-comm (intercomm-create scenario). */ - -typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_ledaer, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - /* non-blocking intracommunicator allreduce */ -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); /* non-blocking intercommunicator allreduce */ -static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static int ompi_comm_register_cid (uint32_t contextid); -static int ompi_comm_unregister_cid (uint32_t contextid); -static uint32_t ompi_comm_lowest_cid ( void ); - -struct ompi_comm_reg_t{ - opal_list_item_t super; - uint32_t cid; -}; -typedef struct ompi_comm_reg_t ompi_comm_reg_t; -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_comm_reg_t); - -static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom); -static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom); +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -OBJ_CLASS_INSTANCE (ompi_comm_reg_t, - opal_list_item_t, - ompi_comm_reg_constructor, - ompi_comm_reg_destructor ); +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static opal_mutex_t ompi_cid_lock; -static opal_list_t ompi_registered_comms; +static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT; int ompi_comm_cid_init (void) @@ -144,239 +166,164 @@ int ompi_comm_cid_init (void) return OMPI_SUCCESS; } -int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, int send_first ) +static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, const char *pmix_tag, bool send_first, + int mode) { + ompi_comm_cid_context_t *context; + ompi_comm_request_t *request; int ret; - int nextcid; - bool flag; - int nextlocal_cid; - int done=0; - int response, glresponse=0; - int start; - unsigned int i; - int iter=0; - ompi_comm_cid_allredfct* allredfnct; - - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; + context = OBJ_NEW(ompi_comm_cid_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - start = ompi_mpi_communicators.lowest_free; - - while (!done) { - /** - * This is the real algorithm described in the doc - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - continue; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - nextlocal_cid = mca_pml.pml_max_contextid; - flag = false; - for (i=start; i < mca_pml.pml_max_contextid ; i++) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, comm); - if (true == flag) { - nextlocal_cid = i; - break; - } - } - - ret = (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - goto release_and_return; - } + context->newcomm = newcomm; + context->comm = comm; + context->bridgecomm = bridgecomm; - if (mca_pml.pml_max_contextid == (unsigned int) nextcid) { - /* at least one peer ran out of CIDs */ - if (flag) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto release_and_return; - } + /* Determine which implementation of allreduce we have to use + * for the current mode. */ + switch (mode) { + case OMPI_COMM_CID_INTRA: + context->allreduce_fn = ompi_comm_allreduce_intra_nb; + break; + case OMPI_COMM_CID_INTER: + context->allreduce_fn = ompi_comm_allreduce_inter_nb; + break; + case OMPI_COMM_CID_GROUP: + context->allreduce_fn = ompi_comm_allreduce_group_nb; + context->pml_tag = ((int *) arg0)[0]; + break; + case OMPI_COMM_CID_INTRA_PMIX: + context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb; + context->local_leader = ((int *) arg0)[0]; + if (arg1) { + context->port_string = strdup ((char *) arg1); } + context->pmix_tag = strdup ((char *) pmix_tag); + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb; + context->local_leader = ((int *) arg0)[0]; + context->remote_leader = ((int *) arg1)[0]; + break; + default: + OBJ_RELEASE(context); + return NULL; + } + + context->send_first = send_first; + context->iter = 0; + + return context; +} - if (nextcid == nextlocal_cid) { - response = 1; /* fine with me */ - } - else { - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextlocal_cid, NULL); - - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - nextcid, comm ); - if (true == flag) { - response = 1; /* works as well */ - } - else { - response = 0; /* nope, not acceptable */ - } - } +static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context) +{ + ompi_comm_allreduce_context_t *context; - ret = (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL); - goto release_and_return; - } - if (1 == glresponse) { - done = 1; /* we are done */ - break; - } - else if ( 0 == glresponse ) { - if ( 1 == response ) { - /* we could use that, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextcid, NULL); - } - start = nextcid+1; /* that's where we can start the next round */ - } + context = OBJ_NEW(ompi_comm_allreduce_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - /* set the according values to the newcomm */ - newcomm->c_contextid = nextcid; - opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); - - release_and_return: - ompi_comm_unregister_cid (comm->c_contextid); + context->inbuf = inbuf; + context->outbuf = outbuf; + context->count = count; + context->op = op; + context->cid_context = cid_context; - return ret; + return context; } -/* Non-blocking version of ompi_comm_nextcid */ -struct mca_comm_nextcid_context { - ompi_communicator_t* newcomm; - ompi_communicator_t* comm; - ompi_communicator_t* bridgecomm; - int mode; - int nextcid; - int nextlocal_cid; - int start; - int flag, rflag; -}; - /* find the next available local cid and start an allreduce */ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); /* verify that the maximum cid is locally available and start an allreduce */ static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); +/* lock the cid generator */ +static int ompi_comm_cid_lock (ompi_comm_request_t *request); -int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req) { - struct mca_comm_nextcid_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; int ret; - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; - } - - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, + "nextcid", send_first, mode); if (NULL == context) { - ompi_comm_unregister_cid (comm->c_contextid); return OMPI_ERR_OUT_OF_RESOURCE; } + context->start = ompi_mpi_communicators.lowest_free; + request = ompi_comm_request_get (); if (NULL == request) { - ompi_comm_unregister_cid (comm->c_contextid); - free (context); + OBJ_RELEASE(context); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; - context->bridgecomm = bridgecomm; - context->mode = mode; - context->start = ompi_mpi_communicators.lowest_free; - - request->context = context; + request->context = &context->super; - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); ompi_comm_request_start (request); *req = &request->super; + return OMPI_SUCCESS; } +int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + +static int ompi_comm_cid_lock (ompi_comm_request_t *request) +{ + if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); +} + static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; - unsigned int i; bool flag; int ret; /** * This is the real algorithm described in the doc */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (context->comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - - return OMPI_SUCCESS; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; - for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { + for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, context->comm); if (true == flag) { @@ -385,15 +332,10 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) } } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context, &subreq); if (OMPI_SUCCESS != ret) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -403,18 +345,17 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* next we want to verify that the resulting commid is ok */ - ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); - - return OMPI_SUCCESS; + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); } static int ompi_comm_checkcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; int ret; @@ -423,37 +364,31 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) if (!context->flag) { opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); - context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - context->nextcid, context->comm); + context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, + context->nextcid, context->comm); } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } + ++context->iter; - if (OMPI_SUCCESS != ret) { - return ret; + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MAX, context, &subreq); + if (OMPI_SUCCESS == ret) { + ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } - ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); - - return OMPI_SUCCESS; + return ret; } static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); - ompi_comm_unregister_cid (context->comm->c_contextid); + /* unlock the cid generator */ + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ return OMPI_SUCCESS; @@ -461,118 +396,16 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) if (1 == context->flag) { /* we could use this cid, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL); + opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL); context->start = context->nextcid + 1; /* that's where we can start the next round */ } + ++context->iter; + /* try again */ return ompi_comm_allreduce_getnextcid (request); } -/**************************************************************************/ -/**************************************************************************/ -/**************************************************************************/ -static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom) -{ - regcom->cid=MPI_UNDEFINED; -} - -static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom) -{ -} - -void ompi_comm_reg_init (void) -{ - OBJ_CONSTRUCT(&ompi_registered_comms, opal_list_t); - OBJ_CONSTRUCT(&ompi_cid_lock, opal_mutex_t); -} - -void ompi_comm_reg_finalize (void) -{ - OBJ_DESTRUCT(&ompi_registered_comms); - OBJ_DESTRUCT(&ompi_cid_lock); -} - - -static int ompi_comm_register_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); - bool registered = false; - - do { - /* Only one communicator function allowed in same time on the - * same communicator. - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - - newentry->cid = cid; - if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { - bool ok = true; - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if ( regcom->cid > cid ) { - break; - } -#if OMPI_ENABLE_THREAD_MULTIPLE - if( regcom->cid == cid ) { - /** - * The MPI standard state that is the user responsability to - * schedule the global communications in order to avoid any - * kind of troubles. As, managing communicators involve several - * collective communications, we should enforce a sequential - * execution order. This test only allow one communicator - * creation function based on the same communicator. - */ - ok = false; - break; - } -#endif /* OMPI_ENABLE_THREAD_MULTIPLE */ - } - if (ok) { - opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom, - (opal_list_item_t *)newentry); - registered = true; - } - } else { - opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); - registered = true; - } - - /* drop the lock before trying again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - } while (!registered); - - return OMPI_SUCCESS; -} - -static int ompi_comm_unregister_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - - OPAL_THREAD_LOCK(&ompi_cid_lock); - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if(regcom->cid == cid) { - opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom); - OBJ_RELEASE(regcom); - break; - } - } - - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - - return OMPI_SUCCESS; -} - -static uint32_t ompi_comm_lowest_cid (void) -{ - ompi_comm_reg_t *regcom=NULL; - opal_list_item_t *item=opal_list_get_first (&ompi_registered_comms); - - regcom = (ompi_comm_reg_t *)item; - return regcom->cid; -} /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -590,174 +423,43 @@ static uint32_t ompi_comm_lowest_cid (void) * comm.c is, that this file contains the allreduce implementations * which are required, and thus we avoid having duplicate code... */ -int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ) -{ - int ret = 0; - - int ok=0, gok=0; - ompi_comm_cid_allredfct* allredfnct; - - /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - - if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { - /* Initialize the PML stuff in the newcomm */ - if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { - goto bail_on_error; - } - - OMPI_COMM_SET_PML_ADDED(*newcomm); - } - - - ret = (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "activate", 0 ); - if( OMPI_SUCCESS != ret ) { - goto bail_on_error; - } - - - - /** - * Check to see if this process is in the new communicator. - * - * Specifically, this function is invoked by all proceses in the - * old communicator, regardless of whether they are in the new - * communicator or not. This is because it is far simpler to use - * MPI collective functions on the old communicator to determine - * some data for the new communicator (e.g., remote_leader) than - * to kludge up our own pseudo-collective routines over just the - * processes in the new communicator. Hence, *all* processes in - * the old communicator need to invoke this function. - * - * That being said, only processes in the new communicator need to - * select a coll module for the new communicator. More - * specifically, proceses who are not in the new communicator - * should *not* select a coll module -- for example, - * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who - * are not in the new communicator. This can cause errors in the - * selection / initialization of a coll module. Plus, it's - * wasteful -- processes in the new communicator will end up - * freeing the new communicator anyway, so we might as well leave - * the coll selection as NULL (the coll base comm unselect code - * handles that case properly). - */ - if (MPI_UNDEFINED == (*newcomm)->c_local_group->grp_my_rank) { - return OMPI_SUCCESS; - } - - /* Let the collectives components fight over who will do - collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*newcomm))) { - goto bail_on_error; - } - - /* For an inter communicator, we have to deal with the potential - * problem of what is happening if the local_comm that we created - * has a lower CID than the parent comm. This is not a problem - * as long as the user calls MPI_Comm_free on the inter communicator. - * However, if the communicators are not freed by the user but released - * by Open MPI in MPI_Finalize, we walk through the list of still available - * communicators and free them one by one. Thus, local_comm is freed before - * the actual inter-communicator. However, the local_comm pointer in the - * inter communicator will still contain the 'previous' address of the local_comm - * and thus this will lead to a segmentation violation. In order to prevent - * that from happening, we increase the reference counter local_comm - * by one if its CID is lower than the parent. We cannot increase however - * its reference counter if the CID of local_comm is larger than - * the CID of the inter communicators, since a regular MPI_Comm_free would - * leave in that the case the local_comm hanging around and thus we would not - * recycle CID's properly, which was the reason and the cause for this trouble. - */ - if ( OMPI_COMM_IS_INTER(*newcomm)) { - if ( OMPI_COMM_CID_IS_LOWER(*newcomm, comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*newcomm); - OBJ_RETAIN (*newcomm); - } - } - - - return OMPI_SUCCESS; - - bail_on_error: - OBJ_RELEASE(*newcomm); - *newcomm = MPI_COMM_NULL; - return ret; -} /* Non-blocking version of ompi_comm_activate */ -struct ompi_comm_activate_nb_context { - ompi_communicator_t **newcomm; - ompi_communicator_t *comm; - - /* storage for activate barrier */ - int ok; -}; - static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); -int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req) { - struct ompi_comm_activate_nb_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; + int local_leader; + int remote_leader; int ret = 0; - request = ompi_comm_request_get (); - if (NULL == request) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", + send_first, mode); if (NULL == context) { - ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; + /* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */ + context->newcommp = newcomm; - request->context = context; - - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { OBJ_RELEASE(newcomm); + OBJ_RELEASE(context); *newcomm = MPI_COMM_NULL; return ret; } @@ -765,16 +467,10 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, } /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - if (mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } - + * send messages over the new communicator + */ + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MAX, context, + &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); return ret; @@ -788,10 +484,28 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, return OMPI_SUCCESS; } +int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { - struct ompi_comm_activate_nb_context *context = - (struct ompi_comm_activate_nb_context *) request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; int ret; /** @@ -818,15 +532,15 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * the coll selection as NULL (the coll base comm unselect code * handles that case properly). */ - if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) { + if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) { return OMPI_SUCCESS; } /* Let the collectives components fight over who will do collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) { - OBJ_RELEASE(*context->newcomm); - *context->newcomm = MPI_COMM_NULL; + if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) { + OBJ_RELEASE(context->newcomm); + *context->newcommp = MPI_COMM_NULL; return ret; } @@ -847,10 +561,10 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * leave in that the case the local_comm hanging around and thus we would not * recycle CID's properly, which was the reason and the cause for this trouble. */ - if (OMPI_COMM_IS_INTER(*context->newcomm)) { - if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm); - OBJ_RETAIN (*context->newcomm); + if (OMPI_COMM_IS_INTER(context->newcomm)) { + if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) { + OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm); + OBJ_RETAIN (context->newcomm); } } @@ -861,207 +575,47 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *context, ompi_request_t **req) { - return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm, - comm->c_coll.coll_allreduce_module ); -} + ompi_communicator_t *comm = context->comm; -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - ompi_request_t **req) -{ return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm, req, comm->c_coll.coll_iallreduce_module); } - -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) -{ - int local_rank, rsize; - int rc; - int *sbuf; - int *tmpbuf=NULL; - int *rcounts=NULL, scount=0; - int *rdisps=NULL; - - if ( !OMPI_COMM_IS_INTER (intercomm)) { - return MPI_ERR_COMM; - } - - /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); - local_rank = ompi_comm_rank ( intercomm ); - - tmpbuf = (int *) malloc ( count * sizeof(int)); - rdisps = (int *) calloc ( rsize, sizeof(int)); - rcounts = (int *) calloc ( rsize, sizeof(int) ); - if ( OPAL_UNLIKELY (NULL == tmpbuf || NULL == rdisps || NULL == rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /* Execute the inter-allreduce: the result of our group will - be in the buffer of the remote group */ - rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, intercomm, - intercomm->c_coll.coll_allreduce_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if ( 0 == local_rank ) { - MPI_Request req; - - /* for the allgatherv later */ - scount = count; - - /* local leader exchange their data and determine the overall result - for both groups */ - rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - intercomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, - intercomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait ( &req, MPI_STATUS_IGNORE ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); - } - - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - rcounts[0] = count; - sbuf = outbuf; - rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, - rcounts, rdisps, MPI_INT, - intercomm, - intercomm->c_coll.coll_allgatherv_module); - - exit: - if ( NULL != tmpbuf ) { - free ( tmpbuf ); - } - if ( NULL != rcounts ) { - free ( rcounts ); - } - if ( NULL != rdisps ) { - free ( rdisps ); - } - - return (rc); -} - /* Non-blocking version of ompi_comm_allreduce_inter */ -struct ompi_comm_allreduce_inter_context { - int *inbuf; - int *outbuf; - int count; - struct ompi_op_t *op; - ompi_communicator_t *intercomm; - ompi_communicator_t *bridgecomm; - int *tmpbuf; - int *rcounts; - int *rdisps; -}; - -static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context) -{ - if (context->tmpbuf) { - free (context->tmpbuf); - } - - if (context->rdisps) { - free (context->rdisps); - } - - if (context->rcounts) { - free (context->rcounts); - } - - free (context); -} - static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request); -/* Arguments not used in this implementation: - * - bridgecomm - */ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, + ompi_comm_cid_context_t *cid_context, ompi_request_t **req) { - struct ompi_comm_allreduce_inter_context *context = NULL; - ompi_comm_request_t *request = NULL; + ompi_communicator_t *intercomm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; ompi_request_t *subreq; int local_rank, rsize, rc; - if (!OMPI_COMM_IS_INTER (intercomm)) { + if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; } request = ompi_comm_request_get (); - if (NULL == request) { + if (OPAL_UNLIKELY(NULL == request)) { return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); - if (NULL == context) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - context->inbuf = inbuf; - context->outbuf = outbuf; - context->count = count; - context->op = op; - context->intercomm = intercomm; - context->bridgecomm = bridgecomm; + request->context = &context->super; /* Allocate temporary arrays */ rsize = ompi_comm_remote_size (intercomm); @@ -1071,18 +625,17 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, context->rdisps = (int *) calloc (rsize, sizeof(int)); context->rcounts = (int *) calloc (rsize, sizeof(int)); if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - request->context = context; - /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, &subreq, intercomm->c_coll.coll_iallreduce_module); - if (OMPI_SUCCESS != rc) { - goto exit; + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + ompi_comm_request_return (request); + return rc; } if (0 == local_rank) { @@ -1094,58 +647,37 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, ompi_comm_request_start (request); *req = &request->super; -exit: - if (OMPI_SUCCESS != rc) { - if (context) { - ompi_comm_allreduce_inter_context_free (context); - } - - if (request) { - request->context = NULL; - ompi_comm_request_return (request); - } - } - - return rc; + return OMPI_SUCCESS; } static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreqs[2]; int rc; /* local leader exchange their data and determine the overall result for both groups */ rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - context->intercomm, subreqs)); - if ( OMPI_SUCCESS != rc ) { - goto exit; + intercomm, subreqs)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); - -exit: - if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; + MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - return rc; + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); } static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); @@ -1155,8 +687,8 @@ static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreq; int scount = 0, rc; @@ -1167,245 +699,369 @@ static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) the result first. */ - if (0 != ompi_comm_rank (context->intercomm)) { + if (0 != ompi_comm_rank (intercomm)) { context->rcounts[0] = context->count; } else { scount = context->count; } - rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, - context->intercomm, &subreq, - context->intercomm->c_coll.coll_iallgatherv_module); + rc = intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, + context->rcounts, context->rdisps, MPI_INT, intercomm, + &subreq, intercomm->c_coll.coll_iallgatherv_module); if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; return rc; } - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1); + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); +} - return OMPI_SUCCESS; +static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *comm = context->cid_context->comm; + ompi_request_t *subreq; + int rc; + + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, + context->cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ibcast_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); } -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request) +static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request) { - /* free this request's context */ - ompi_comm_allreduce_inter_context_free (request->context); - /* prevent a double-free from the progress engine */ - request->context = NULL; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; - /* done */ - return OMPI_SUCCESS; + /* step 3: reduce leader data */ + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); + + /* schedule the broadcast to local peers */ + return ompi_comm_allreduce_bridged_schedule_bcast (request); } -/* Arguments not used in this implementation: - * - send_first - */ -static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bcomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm; + ompi_request_t *subreq[2]; + int rc; + + /* step 2: leader exchange */ + rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm, + subreq)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2); +} + +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - int *tmpbuf=NULL; - int local_rank; + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; int i; int rc; - int local_leader, remote_leader; - local_leader = (*((int*)lleader)); - remote_leader = (*((int*)rleader)); + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (local_rank == cid_context->local_leader) { + context->tmpbuf = (int *) calloc (count, sizeof (int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; + request = ompi_comm_request_get (); + if (OPAL_UNLIKELY(NULL == request)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + request->context = &context->super; + + if (cid_context->local_leader == local_rank) { + memcpy (context->tmpbuf, inbuf, count * sizeof (int)); } - /* Intercomm_create */ - rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, comm, comm->c_coll.coll_allreduce_module ); + /* step 1: reduce to the local leader */ + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, &subreq, + comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - MPI_Request req; + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - bcomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, bcomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait( &req, MPI_STATUS_IGNORE); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; } - rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, - comm, comm->c_coll.coll_bcast_module ); + ompi_comm_request_start (request); - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); - } + *req = &request->super; - return (rc); + return OMPI_SUCCESS; } -/* Arguments not used in this implementation: - * - bridgecomm - * - * lleader is the local rank of root in comm - * rleader is the port_string - */ -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request) { - int *tmpbuf=NULL; - int rc; - int local_leader, local_rank; - char *port_string; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int32_t size_count = context->count; opal_value_t info; opal_pmix_pdata_t pdat; opal_buffer_t sbuf; - int32_t size_count; + int rc; + + fprintf (stderr, "reduce complete\n"); + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + + if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { + OBJ_DESTRUCT(&sbuf); + fprintf (stderr, "pack failed. rc %d\n", rc); + return rc; + } + + OBJ_CONSTRUCT(&info, opal_value_t); + OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + + info.type = OPAL_BYTE_OBJECT; + pdat.value.type = OPAL_BYTE_OBJECT; + + opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); + OBJ_DESTRUCT(&sbuf); + + if (cid_context->send_first) { + (void)asprintf(&info.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } else { + (void)asprintf(&info.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } + + fprintf (stderr, "%s, %s\n", info.key, pdat.value.key); + + /* this macro is not actually non-blocking. if a non-blocking version becomes available this function + * needs to be reworked to take advantage of it. */ + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OBJ_DESTRUCT(&info); + fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc); + if (OPAL_SUCCESS != rc) { + OBJ_DESTRUCT(&pdat); + return rc; + } + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); + pdat.value.data.bo.bytes = NULL; + pdat.value.data.bo.size = 0; + OBJ_DESTRUCT(&pdat); + + rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT); + OBJ_DESTRUCT(&sbuf); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + return rc; + } - local_leader = (*((int*)lleader)); - port_string = (char*)rleader; - size_count = count; + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT); - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + return ompi_comm_allreduce_bridged_schedule_bcast (request); +} + +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) +{ + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; + int rc; + + fprintf (stderr, "Calling ompi_comm_allreduce_intra_pmix_nb. rank %d\n", local_rank); + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (cid_context->local_leader == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + /* comm is an intra-communicator */ - rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm, - comm->c_coll.coll_allreduce_module); + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, tmpbuf, (int32_t)count, OPAL_INT))) { - goto exit; - } - OBJ_CONSTRUCT(&info, opal_value_t); - OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - info.type = OPAL_BYTE_OBJECT; - pdat.value.type = OPAL_BYTE_OBJECT; + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; + } - opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); - OBJ_DESTRUCT(&sbuf); + ompi_comm_request_start (request); + *req = (ompi_request_t *) request; - if (send_first) { - (void)asprintf(&info.key, "%s:%s:send:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", port_string, tag, iter); - } else { - (void)asprintf(&info.key, "%s:%s:recv:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:send:%d", port_string, tag, iter); - } + /* use the same function as bridged to schedule the broadcast */ + return OMPI_SUCCESS; +} - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); - OBJ_DESTRUCT(&info); +static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + ompi_request_t *subreq[2]; + int subreq_count = 0; + int rc; - if (OPAL_SUCCESS != rc) { - OBJ_DESTRUCT(&pdat); - goto exit; + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq + subreq_count++)); + if (OMPI_SUCCESS != rc) { + return rc; + } } - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); - pdat.value.data.bo.bytes = NULL; - pdat.value.data.bo.size = 0; - OBJ_DESTRUCT(&pdat); + } + + return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count); +} - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&sbuf, outbuf, &size_count, OPAL_INT))) { - OBJ_DESTRUCT(&sbuf); - goto exit; +static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int *tmp = context->tmpbuf; + ompi_request_t *subreq[2]; + int rc; + + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT); + tmp += context->count; } - OBJ_DESTRUCT(&sbuf); - count = (int)size_count; - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); } - rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, - local_leader, comm, - comm->c_coll.coll_bcast_module); + if (MPI_PROC_NULL != context->peers_comm[0]) { + /* interior node */ + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq)); + if (OMPI_SUCCESS != rc) { + return rc; + } + + rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, cid_context->comm, subreq + 1)); + if (OMPI_SUCCESS != rc) { + return rc; + } - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2); } - return (rc); + /* root */ + return ompi_comm_allreduce_group_broadcast (request); } -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *newcomm, - void* local_leader, - void* remote_leader, - int send_first, char *intag, int iter) +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - ompi_group_t *group = newcomm->c_local_group; - int peers_group[3], peers_comm[3]; + ompi_group_t *group = cid_context->newcomm->c_local_group; const int group_size = ompi_group_size (group); const int group_rank = ompi_group_rank (group); - int tag = *((int *) local_leader); - int *tmp1; - int i, rc=OMPI_SUCCESS; + ompi_communicator_t *comm = cid_context->comm; + int peers_group[3], *tmp, subreq_count = 0; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; + ompi_request_t *subreq[3]; + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (NULL == context) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + tmp = context->tmpbuf = calloc (sizeof (int), count * 3); + if (NULL == context->tmpbuf) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; /* basic recursive doubling allreduce on the group */ peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL; @@ -1413,54 +1069,28 @@ static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL; /* translate the ranks into the ranks of the parent communicator */ - ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm); - - tmp1 = malloc (sizeof (int) * count); + ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm); /* reduce */ memmove (outbuf, inbuf, sizeof (int) * count); - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm, - MPI_STATUS_IGNORE)); + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1], + cid_context->pml_tag, comm, subreq + subreq_count++)); if (OMPI_SUCCESS != rc) { - goto out; + ompi_comm_request_return (request); + return rc; } - /* this is integer reduction so we do not care about ordering */ - ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT); - } - } - - if (MPI_PROC_NULL != peers_comm[0]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0], - tag, MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0], - tag, comm, MPI_STATUS_IGNORE)); - if (OMPI_SUCCESS != rc) { - goto out; + tmp += count; } } - /* broadcast */ - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag, - MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - } - } + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count); - out: - free (tmp1); + ompi_comm_request_start (request); + *req = &request->super; - return rc; + return OMPI_SUCCESS; } - -END_C_DECLS diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index f2acab5bbec..f453ca1e8e1 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -206,10 +206,6 @@ int ompi_comm_init(void) OBJ_RETAIN(&ompi_mpi_group_null.group); OBJ_RETAIN(&ompi_mpi_errors_are_fatal.eh); - /* initialize the comm_reg stuff for multi-threaded comm_cid - allocation */ - ompi_comm_reg_init(); - /* initialize communicator requests (for ompi_comm_idup) */ ompi_comm_request_init (); @@ -328,13 +324,9 @@ int ompi_comm_finalize(void) } } - OBJ_DESTRUCT (&ompi_mpi_communicators); OBJ_DESTRUCT (&ompi_comm_f_to_c_table); - /* finalize the comm_reg stuff */ - ompi_comm_reg_finalize(); - /* finalize communicator requests */ ompi_comm_request_fini (); diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c index 496de0319aa..f7372e27452 100644 --- a/ompi/communicator/comm_request.c +++ b/ompi/communicator/comm_request.c @@ -234,6 +234,7 @@ static void ompi_comm_request_destruct (ompi_comm_request_t *request) { OBJ_DESTRUCT(&request->schedule); } + OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t, ompi_comm_request_construct, ompi_comm_request_destruct); @@ -257,10 +258,10 @@ ompi_comm_request_t *ompi_comm_request_get (void) void ompi_comm_request_return (ompi_comm_request_t *request) { if (request->context) { - free (request->context); - request->context = NULL; + OBJ_RELEASE (request->context); } + OMPI_REQUEST_FINI(&request->super); opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request); } diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h index 246a3010b0e..65af613f95f 100644 --- a/ompi/communicator/comm_request.h +++ b/ompi/communicator/comm_request.h @@ -22,7 +22,7 @@ typedef struct ompi_comm_request_t { ompi_request_t super; - void *context; + opal_object_t *context; opal_list_t schedule; } ompi_comm_request_t; OBJ_CLASS_DECLARATION(ompi_comm_request_t); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 455defb9cc5..a8d4756cffa 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -492,24 +492,27 @@ ompi_communicator_t* ompi_comm_allocate (int local_group_size, * @param mode: combination of input * OMPI_COMM_CID_INTRA: intra-comm * OMPI_COMM_CID_INTER: inter-comm + * OMPI_COMM_CID_GROUP: only decide CID within the ompi_group_t + * associated with the communicator. arg0 + * must point to an int which will be used + * as the pml tag for communication. * OMPI_COMM_CID_INTRA_BRIDGE: 2 intracomms connected by - * a bridge comm. local_leader - * and remote leader are in this - * case an int (rank in bridge-comm). + * a bridge comm. arg0 and arg1 must point + * to integers representing the local and + * remote leader ranks. the remote leader rank + * is a rank in the bridgecomm. * OMPI_COMM_CID_INTRA_PMIX: 2 intracomms, leaders talk - * through PMIx. lleader and rleader - * are the required contact information. + * through PMIx. arg0 must point to an integer + * representing the local leader rank. arg1 + * must point to a string representing the + * port of the remote leader. * @param send_first: to avoid a potential deadlock for * the OOB version. * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* oldcomm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first); +OMPI_DECLSPEC int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode); /** * allocate new communicator ID (non-blocking) @@ -521,10 +524,9 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, * OMPI_COMM_CID_INTER: inter-comm * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req); +OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req); /** * shut down the communicator infrastructure. @@ -617,18 +619,25 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high ); -OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ); +OMPI_DECLSPEC int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode); -OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req); +/** + * Non-blocking variant of comm_activate. + * + * @param[inout] newcomm New communicator + * @param[in] comm Parent communicator + * @param[in] bridgecomm Bridge communicator (used for PMIX and bridge modes) + * @param[in] arg0 Mode argument 0 + * @param[in] arg1 Mode argument 1 + * @param[in] send_first Send first from this process (PMIX mode only) + * @param[in] mode Collective mode + * @param[out] req New request object to track this operation + */ +OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req); /** * a simple function to dump the structure @@ -638,14 +647,6 @@ int ompi_comm_dump ( ompi_communicator_t *comm ); /* setting name */ int ompi_comm_set_name (ompi_communicator_t *comm, const char *name ); -/* - * these are the init and finalize functions for the comm_reg - * stuff. These routines are necessary for handling multi-threading - * scenarious in the communicator_cid allocation - */ -void ompi_comm_reg_init(void); -void ompi_comm_reg_finalize(void); - /* global variable to save the number od dynamic communicators */ extern int ompi_comm_num_dyncomm; diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index 2da39d920a4..7619e8a219f 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -469,25 +469,25 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, new_group_pointer = MPI_GROUP_NULL; /* allocate comm_cid */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } /* activate comm and init coll-component */ - rc = ompi_comm_activate ( &newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_activate ( &newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } @@ -500,7 +500,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, exit: if (OMPI_SUCCESS != rc) { if (MPI_COMM_NULL != newcomp && NULL != newcomp) { - OBJ_RETAIN(newcomp); + OBJ_RELEASE(newcomp); newcomp = MPI_COMM_NULL; } } diff --git a/ompi/mpi/c/intercomm_create.c b/ompi/mpi/c/intercomm_create.c index b7e948cd624..10f0e8ad2ca 100644 --- a/ompi/mpi/c/intercomm_create.c +++ b/ompi/mpi/c/intercomm_create.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -196,26 +199,15 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader, new_group_pointer = MPI_GROUP_NULL; /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ - + rc = ompi_comm_nextcid (newcomp, local_comm, bridge_comm, &lleader, + &rleader, false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } /* activate comm and init coll-module */ - rc = ompi_comm_activate ( &newcomp, - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, local_comm, bridge_comm, &lleader, &rleader, + false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } diff --git a/ompi/mpi/c/intercomm_merge.c b/ompi/mpi/c/intercomm_merge.c index bcf8f9fec73..55258b637ef 100644 --- a/ompi/mpi/c/intercomm_merge.c +++ b/ompi/mpi/c/intercomm_merge.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -115,26 +118,16 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high, OBJ_RELEASE(new_group_pointer); new_group_pointer = MPI_GROUP_NULL; - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + /* Determine context id */ + rc = ompi_comm_nextcid (newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } From a993e1d47884e09ddef21b837b752a33aa15138f Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 19 Jul 2016 17:35:34 -0700 Subject: [PATCH 03/10] Silence warnings (cherry picked from commit 36a9063466c2ac71ad768cb2f2424e07288844ed) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) mode change 100644 => 100755 ompi/communicator/comm_cid.c diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c old mode 100644 new mode 100755 index ab9c9961198..b4478b120ed --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -17,7 +17,7 @@ * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -172,8 +172,6 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t int mode) { ompi_comm_cid_context_t *context; - ompi_comm_request_t *request; - int ret; context = OBJ_NEW(ompi_comm_cid_context_t); if (OPAL_UNLIKELY(NULL == context)) { @@ -256,7 +254,6 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com { ompi_comm_cid_context_t *context; ompi_comm_request_t *request; - int ret; context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, "nextcid", send_first, mode); @@ -434,8 +431,6 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; - int local_leader; - int remote_leader; int ret = 0; context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", @@ -777,7 +772,6 @@ static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int local_rank = ompi_comm_rank (comm); ompi_comm_request_t *request; ompi_request_t *subreq; - int i; int rc; context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); From 854e252ba8070da5f8a2bde474c13c2347085277 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 12 Oct 2016 11:53:10 -0600 Subject: [PATCH 04/10] Remove a debug print in comm_cid.c Back-ported from 01a653d50a996f044d8daff54261ac8de3a51de7 Signed-off-by: Nathan Hjelm (cherry picked from commit b8c9f13c2762c459e13e569aaed23b59b0f0fff2) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 1 - 1 file changed, 1 deletion(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index b4478b120ed..0f2415ab94d 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -877,7 +877,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques * needs to be reworked to take advantage of it. */ OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); OBJ_DESTRUCT(&info); - fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc); if (OPAL_SUCCESS != rc) { OBJ_DESTRUCT(&pdat); return rc; From 32f15f9a3d7fbbd44c3e877d49bbd38576c5de47 Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Fri, 22 Jul 2016 15:42:56 +0900 Subject: [PATCH 05/10] ompi/communicator: remove an other debug print statement in ompi_comm_allreduce_intra_pmix_nb() (cherry picked from commit bbc6d4b3d44476af49adfa582900f57c4e2323ec) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 0f2415ab94d..f71c9da27da 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -18,7 +18,7 @@ * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. - * Copyright (c) 2014 Research Organization for Information Science + * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * @@ -911,8 +911,6 @@ static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, ompi_request_t *subreq; int rc; - fprintf (stderr, "Calling ompi_comm_allreduce_intra_pmix_nb. rank %d\n", local_rank); - context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); if (OPAL_UNLIKELY(NULL == context)) { return OMPI_ERR_OUT_OF_RESOURCE; From a460a7689e4f56407d5425b29c892ccb0d3015b1 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Mon, 8 Aug 2016 13:20:24 -0700 Subject: [PATCH 06/10] Remove forced debugs (cherry picked from commit ba77d9beff8a5c425926baa7621dc43332f1e973) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f71c9da27da..8769b27c6a4 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -840,8 +840,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques opal_buffer_t sbuf; int rc; - fprintf (stderr, "reduce complete\n"); - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { @@ -871,8 +869,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques cid_context->iter); } - fprintf (stderr, "%s, %s\n", info.key, pdat.value.key); - /* this macro is not actually non-blocking. if a non-blocking version becomes available this function * needs to be reworked to take advantage of it. */ OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); From 2548bfc3bb0e355ffda582d514fc26d5d4003127 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 18 Aug 2016 08:50:06 -0600 Subject: [PATCH 07/10] comm/cid: fix threaded CID allocation This commit should restore the pre-non-blocking behavior of the CID allocator when threads are used. There are two primary changes: 1) do not hold the cid allocator lock past the end of a request callback, and 2) if a lower id communicator is detected during CID allocation back off and let the lower id communicator finish before continuing. Signed-off-by: Nathan Hjelm (cherry picked from commit fbbf743c368cad6d2422c5712a04f92446880617) --- ompi/communicator/comm_cid.c | 50 +++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 8769b27c6a4..60981e2ba1d 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t context->newcomm = newcomm; context->comm = comm; context->bridgecomm = bridgecomm; + context->pml_tag = 0; /* Determine which implementation of allreduce we have to use * for the current mode. */ @@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); -/* lock the cid generator */ -static int ompi_comm_cid_lock (ompi_comm_request_t *request); + +static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX; int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, @@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com request->context = &context->super; - ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); ompi_comm_request_start (request); *req = &request->super; @@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, return rc; } -static int ompi_comm_cid_lock (ompi_comm_request_t *request) -{ - if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { - return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - } - - return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); -} - static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag); ompi_request_t *subreq; bool flag; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + if (ompi_comm_cid_lowest_id < my_id) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + ompi_comm_cid_lowest_id = my_id; + /** * This is the real algorithm described in the doc */ flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, context->comm); + flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, + context->comm); if (true == flag) { context->nextlocal_cid = i; break; @@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, context, &subreq); if (OMPI_SUCCESS != ret) { + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -341,10 +346,12 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) if (flag) { opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } - + + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* next we want to verify that the resulting commid is ok */ return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); @@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ompi_request_t *subreq; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0); + } + context->flag = (context->nextcid == context->nextlocal_cid); if (!context->flag) { @@ -372,6 +383,8 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ret; } @@ -379,12 +392,17 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0); + } + if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); /* unlock the cid generator */ + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ @@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) ++context->iter; + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + /* try again */ return ompi_comm_allreduce_getnextcid (request); } From 671c72f45860a270d975d54247afa004ef2bc060 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 7 Sep 2016 08:25:37 -0600 Subject: [PATCH 08/10] comm/cid: use ibcast to distribute result in intercomm case This commit updates the intercomm allgather to do a local comm bcast as the final step. This should resolve a hang seen in intercomm tests. Signed-off-by: Nathan Hjelm (cherry picked from commit 54cc829aabe7489843e90d47dfed98f52db89993) --- ompi/communicator/comm_cid.c | 52 ++++++++++++------------------------ 1 file changed, 17 insertions(+), 35 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 60981e2ba1d..8ed1a465d49 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -103,10 +103,6 @@ struct ompi_comm_allreduce_context_t { ompi_comm_cid_context_t *cid_context; int *tmpbuf; - /* for intercomm allreduce */ - int *rcounts; - int *rdisps; - /* for group allreduce */ int peers_comm[3]; }; @@ -121,8 +117,6 @@ static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) { free (context->tmpbuf); - free (context->rcounts); - free (context->rdisps); } OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, @@ -602,7 +596,7 @@ static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, str /* Non-blocking version of ompi_comm_allreduce_inter */ static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); +static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, @@ -636,18 +630,19 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, rsize = ompi_comm_remote_size (intercomm); local_rank = ompi_comm_rank (intercomm); - context->tmpbuf = (int *) calloc (count, sizeof(int)); - context->rdisps = (int *) calloc (rsize, sizeof(int)); - context->rcounts = (int *) calloc (rsize, sizeof(int)); - if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - ompi_comm_request_return (request); - return OMPI_ERR_OUT_OF_RESOURCE; + if (0 == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY (NULL == context->tmpbuf)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; + } } /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ - rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, - &subreq, intercomm->c_coll.coll_iallreduce_module); + rc = intercomm->c_local_comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0, + intercomm->c_local_comm, &subreq, + intercomm->c_local_comm->c_coll.coll_ireduce_module); if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { ompi_comm_request_return (request); return rc; @@ -656,7 +651,7 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, if (0 == local_rank) { ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1); } else { - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather, &subreq, 1); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1); } ompi_comm_request_start (request); @@ -696,33 +691,20 @@ static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); - return ompi_comm_allreduce_inter_allgather (request); + return ompi_comm_allreduce_inter_bcast (request); } -static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) +static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request) { ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; - ompi_communicator_t *intercomm = context->cid_context->comm; + ompi_communicator_t *comm = context->cid_context->comm->c_local_comm; ompi_request_t *subreq; int scount = 0, rc; - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - - if (0 != ompi_comm_rank (intercomm)) { - context->rcounts[0] = context->count; - } else { - scount = context->count; - } - - rc = intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, intercomm, - &subreq, intercomm->c_coll.coll_iallgatherv_module); + /* both roots have the same result. broadcast to the local group */ + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm, + &subreq, comm->c_coll.coll_ibcast_module); if (OMPI_SUCCESS != rc) { return rc; } From 4af235756edad657668751a9d6f1c8f2871c2e1d Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Fri, 9 Sep 2016 10:10:35 +0900 Subject: [PATCH 09/10] ompi/communicator: fix typos in CID generation use MPI_MIN instead of MPI_MAX when appropriate, otherwise a currently used CID can be reused, and bad things will likely happen. Refs open-mpi/ompi#2061 (cherry picked from commit 3b968ec6bb693c64966d9e596786cc4d5cc7e231) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 8ed1a465d49..a6f44db891b 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -372,7 +372,7 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ++context->iter; - ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MAX, context, &subreq); + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq); if (OMPI_SUCCESS == ret) { ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } @@ -478,7 +478,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c /* Step 1: the barrier, after which it is allowed to * send messages over the new communicator */ - ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MAX, context, + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context, &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); From aa753a08f5d0ef4e9a2f6780b93110d7dd388bad Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Thu, 6 Oct 2016 15:03:06 +0900 Subject: [PATCH 10/10] ompi/communicator: silence warnings (cherry picked from commit 6c6e35bb40ca600695077f3a3618e9195d2411c9) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index a6f44db891b..d91a427c93b 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -607,7 +607,7 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, ompi_comm_allreduce_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; - int local_rank, rsize, rc; + int local_rank, rc; if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; @@ -627,7 +627,6 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, request->context = &context->super; /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); local_rank = ompi_comm_rank (intercomm); if (0 == local_rank) { @@ -700,12 +699,12 @@ static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request) ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_communicator_t *comm = context->cid_context->comm->c_local_comm; ompi_request_t *subreq; - int scount = 0, rc; + int rc; /* both roots have the same result. broadcast to the local group */ rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm, &subreq, comm->c_coll.coll_ibcast_module); - if (OMPI_SUCCESS != rc) { + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { return rc; }