diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index c1997d3841f..342e5028afe 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -358,13 +358,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -374,13 +368,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, newcomp->c_contextid, comm->c_contextid ); /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -609,13 +597,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -634,36 +616,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != sorted ) { - free ( sorted ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != rsorted ) { - free ( rsorted ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); - } + free ( results ); + free ( sorted ); + free ( rresults ); + free ( rsorted ); + free ( lranks ); + free ( rranks ); /* Step 4: if we are not part of the comm, free the struct */ /* --------------------------------------------------------- */ @@ -675,7 +636,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } *newcomm = newcomp; - return ( rc ); + return rc; } @@ -925,13 +886,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -950,13 +905,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -1031,13 +980,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1047,13 +990,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1062,11 +999,15 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp return MPI_SUCCESS; } -struct ompi_comm_idup_with_info_context { +struct ompi_comm_idup_with_info_context_t { + opal_object_t super; ompi_communicator_t *comm; ompi_communicator_t *newcomp; }; +typedef struct ompi_comm_idup_with_info_context_t ompi_comm_idup_with_info_context_t; +OBJ_CLASS_INSTANCE(ompi_comm_idup_with_info_context_t, opal_object_t, NULL, NULL); + static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request); static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request); static int ompi_comm_idup_getcid (ompi_comm_request_t *request); @@ -1085,7 +1026,7 @@ int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_group_t *remote_group, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req) { - struct ompi_comm_idup_with_info_context *context; + ompi_comm_idup_with_info_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq[1]; int rc; @@ -1101,7 +1042,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); + context = OBJ_NEW(ompi_comm_idup_with_info_context_t); if (NULL == context) { ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; @@ -1109,7 +1050,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro context->comm = comm; - request->context = context; + request->context = &context->super; rc = ompi_comm_set_nb (&context->newcomp, /* new comm */ comm, /* old comm */ @@ -1142,8 +1083,8 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro static int ompi_comm_idup_getcid (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1154,11 +1095,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */ - context->comm, /* old comm */ - NULL, /* bridge comm */ - mode, /* mode */ - subreq); /* new subrequest */ + rc = ompi_comm_nextcid_nb (context->newcomp, context->comm, NULL, NULL, + NULL, false, mode, subreq); if (OMPI_SUCCESS != rc) { ompi_comm_request_return (request); return rc; @@ -1171,8 +1109,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1187,7 +1125,7 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) context->newcomp->c_contextid, context->comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, subreq); + rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, NULL, NULL, false, mode, subreq); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1233,13 +1171,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - newcomp, /* bridge comm (used to pass the group into the group allreduce) */ - &tag, /* user defined tag */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1249,13 +1181,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - newcomp, - &tag, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1924,13 +1850,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, int ret = OMPI_SUCCESS; /* Determine context id. It is identical to f_2_c_handle */ - ret = ompi_comm_nextcid ( new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ + ret = ompi_comm_nextcid (new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; @@ -1953,15 +1874,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, goto complete_and_return; } - ret = ompi_comm_activate( &new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ - - + ret = ompi_comm_activate (&new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f0c332b5dc1..ab9c9961198 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -14,7 +14,7 @@ * Copyright (c) 2007 Voltaire All rights reserved. * Copyright (c) 2006-2010 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. @@ -44,7 +44,90 @@ #include "ompi/request/request.h" #include "ompi/runtime/mpiruntime.h" -BEGIN_C_DECLS +struct ompi_comm_cid_context_t; + +typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + struct ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); + + +struct ompi_comm_cid_context_t { + opal_object_t super; + + ompi_communicator_t *newcomm; + ompi_communicator_t **newcommp; + ompi_communicator_t *comm; + ompi_communicator_t *bridgecomm; + + ompi_comm_allreduce_impl_fn_t allreduce_fn; + + int nextcid; + int nextlocal_cid; + int start; + int flag, rflag; + int local_leader; + int remote_leader; + int iter; + /** storage for activate barrier */ + int ok; + char *port_string; + bool send_first; + int pml_tag; + char *pmix_tag; +}; + +typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t; + +static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context) +{ + free (context->port_string); + free (context->pmix_tag); +} + +OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t, + mca_comm_cid_context_construct, + mca_comm_cid_context_destruct); + +struct ompi_comm_allreduce_context_t { + opal_object_t super; + + int *inbuf; + int *outbuf; + int count; + struct ompi_op_t *op; + ompi_comm_cid_context_t *cid_context; + int *tmpbuf; + + /* for intercomm allreduce */ + int *rcounts; + int *rdisps; + + /* for group allreduce */ + int peers_comm[3]; +}; + +typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t; + +static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) +{ + free (context->tmpbuf); + free (context->rcounts); + free (context->rdisps); +} + +OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, + ompi_comm_allreduce_context_construct, + ompi_comm_allreduce_context_destruct); /** * These functions make sure, that we determine the global result over @@ -53,90 +136,29 @@ BEGIN_C_DECLS * and a bridge-comm (intercomm-create scenario). */ - -typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_ledaer, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - /* non-blocking intracommunicator allreduce */ -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); /* non-blocking intercommunicator allreduce */ -static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static int ompi_comm_register_cid (uint32_t contextid); -static int ompi_comm_unregister_cid (uint32_t contextid); -static uint32_t ompi_comm_lowest_cid ( void ); - -struct ompi_comm_reg_t{ - opal_list_item_t super; - uint32_t cid; -}; -typedef struct ompi_comm_reg_t ompi_comm_reg_t; -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_comm_reg_t); - -static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom); -static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom); +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -OBJ_CLASS_INSTANCE (ompi_comm_reg_t, - opal_list_item_t, - ompi_comm_reg_constructor, - ompi_comm_reg_destructor ); +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static opal_mutex_t ompi_cid_lock; -static opal_list_t ompi_registered_comms; +static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT; int ompi_comm_cid_init (void) @@ -144,239 +166,164 @@ int ompi_comm_cid_init (void) return OMPI_SUCCESS; } -int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, int send_first ) +static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, const char *pmix_tag, bool send_first, + int mode) { + ompi_comm_cid_context_t *context; + ompi_comm_request_t *request; int ret; - int nextcid; - bool flag; - int nextlocal_cid; - int done=0; - int response, glresponse=0; - int start; - unsigned int i; - int iter=0; - ompi_comm_cid_allredfct* allredfnct; - - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; + context = OBJ_NEW(ompi_comm_cid_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - start = ompi_mpi_communicators.lowest_free; - - while (!done) { - /** - * This is the real algorithm described in the doc - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - continue; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - nextlocal_cid = mca_pml.pml_max_contextid; - flag = false; - for (i=start; i < mca_pml.pml_max_contextid ; i++) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, comm); - if (true == flag) { - nextlocal_cid = i; - break; - } - } - - ret = (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - goto release_and_return; - } + context->newcomm = newcomm; + context->comm = comm; + context->bridgecomm = bridgecomm; - if (mca_pml.pml_max_contextid == (unsigned int) nextcid) { - /* at least one peer ran out of CIDs */ - if (flag) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto release_and_return; - } + /* Determine which implementation of allreduce we have to use + * for the current mode. */ + switch (mode) { + case OMPI_COMM_CID_INTRA: + context->allreduce_fn = ompi_comm_allreduce_intra_nb; + break; + case OMPI_COMM_CID_INTER: + context->allreduce_fn = ompi_comm_allreduce_inter_nb; + break; + case OMPI_COMM_CID_GROUP: + context->allreduce_fn = ompi_comm_allreduce_group_nb; + context->pml_tag = ((int *) arg0)[0]; + break; + case OMPI_COMM_CID_INTRA_PMIX: + context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb; + context->local_leader = ((int *) arg0)[0]; + if (arg1) { + context->port_string = strdup ((char *) arg1); } + context->pmix_tag = strdup ((char *) pmix_tag); + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb; + context->local_leader = ((int *) arg0)[0]; + context->remote_leader = ((int *) arg1)[0]; + break; + default: + OBJ_RELEASE(context); + return NULL; + } + + context->send_first = send_first; + context->iter = 0; + + return context; +} - if (nextcid == nextlocal_cid) { - response = 1; /* fine with me */ - } - else { - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextlocal_cid, NULL); - - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - nextcid, comm ); - if (true == flag) { - response = 1; /* works as well */ - } - else { - response = 0; /* nope, not acceptable */ - } - } +static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context) +{ + ompi_comm_allreduce_context_t *context; - ret = (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL); - goto release_and_return; - } - if (1 == glresponse) { - done = 1; /* we are done */ - break; - } - else if ( 0 == glresponse ) { - if ( 1 == response ) { - /* we could use that, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextcid, NULL); - } - start = nextcid+1; /* that's where we can start the next round */ - } + context = OBJ_NEW(ompi_comm_allreduce_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - /* set the according values to the newcomm */ - newcomm->c_contextid = nextcid; - opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); - - release_and_return: - ompi_comm_unregister_cid (comm->c_contextid); + context->inbuf = inbuf; + context->outbuf = outbuf; + context->count = count; + context->op = op; + context->cid_context = cid_context; - return ret; + return context; } -/* Non-blocking version of ompi_comm_nextcid */ -struct mca_comm_nextcid_context { - ompi_communicator_t* newcomm; - ompi_communicator_t* comm; - ompi_communicator_t* bridgecomm; - int mode; - int nextcid; - int nextlocal_cid; - int start; - int flag, rflag; -}; - /* find the next available local cid and start an allreduce */ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); /* verify that the maximum cid is locally available and start an allreduce */ static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); +/* lock the cid generator */ +static int ompi_comm_cid_lock (ompi_comm_request_t *request); -int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req) { - struct mca_comm_nextcid_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; int ret; - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; - } - - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, + "nextcid", send_first, mode); if (NULL == context) { - ompi_comm_unregister_cid (comm->c_contextid); return OMPI_ERR_OUT_OF_RESOURCE; } + context->start = ompi_mpi_communicators.lowest_free; + request = ompi_comm_request_get (); if (NULL == request) { - ompi_comm_unregister_cid (comm->c_contextid); - free (context); + OBJ_RELEASE(context); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; - context->bridgecomm = bridgecomm; - context->mode = mode; - context->start = ompi_mpi_communicators.lowest_free; - - request->context = context; + request->context = &context->super; - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); ompi_comm_request_start (request); *req = &request->super; + return OMPI_SUCCESS; } +int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + +static int ompi_comm_cid_lock (ompi_comm_request_t *request) +{ + if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); +} + static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; - unsigned int i; bool flag; int ret; /** * This is the real algorithm described in the doc */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (context->comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - - return OMPI_SUCCESS; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; - for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { + for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, context->comm); if (true == flag) { @@ -385,15 +332,10 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) } } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context, &subreq); if (OMPI_SUCCESS != ret) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -403,18 +345,17 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* next we want to verify that the resulting commid is ok */ - ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); - - return OMPI_SUCCESS; + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); } static int ompi_comm_checkcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; int ret; @@ -423,37 +364,31 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) if (!context->flag) { opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); - context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - context->nextcid, context->comm); + context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, + context->nextcid, context->comm); } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } + ++context->iter; - if (OMPI_SUCCESS != ret) { - return ret; + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MAX, context, &subreq); + if (OMPI_SUCCESS == ret) { + ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } - ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); - - return OMPI_SUCCESS; + return ret; } static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); - ompi_comm_unregister_cid (context->comm->c_contextid); + /* unlock the cid generator */ + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ return OMPI_SUCCESS; @@ -461,118 +396,16 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) if (1 == context->flag) { /* we could use this cid, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL); + opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL); context->start = context->nextcid + 1; /* that's where we can start the next round */ } + ++context->iter; + /* try again */ return ompi_comm_allreduce_getnextcid (request); } -/**************************************************************************/ -/**************************************************************************/ -/**************************************************************************/ -static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom) -{ - regcom->cid=MPI_UNDEFINED; -} - -static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom) -{ -} - -void ompi_comm_reg_init (void) -{ - OBJ_CONSTRUCT(&ompi_registered_comms, opal_list_t); - OBJ_CONSTRUCT(&ompi_cid_lock, opal_mutex_t); -} - -void ompi_comm_reg_finalize (void) -{ - OBJ_DESTRUCT(&ompi_registered_comms); - OBJ_DESTRUCT(&ompi_cid_lock); -} - - -static int ompi_comm_register_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); - bool registered = false; - - do { - /* Only one communicator function allowed in same time on the - * same communicator. - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - - newentry->cid = cid; - if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { - bool ok = true; - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if ( regcom->cid > cid ) { - break; - } -#if OMPI_ENABLE_THREAD_MULTIPLE - if( regcom->cid == cid ) { - /** - * The MPI standard state that is the user responsability to - * schedule the global communications in order to avoid any - * kind of troubles. As, managing communicators involve several - * collective communications, we should enforce a sequential - * execution order. This test only allow one communicator - * creation function based on the same communicator. - */ - ok = false; - break; - } -#endif /* OMPI_ENABLE_THREAD_MULTIPLE */ - } - if (ok) { - opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom, - (opal_list_item_t *)newentry); - registered = true; - } - } else { - opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); - registered = true; - } - - /* drop the lock before trying again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - } while (!registered); - - return OMPI_SUCCESS; -} - -static int ompi_comm_unregister_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - - OPAL_THREAD_LOCK(&ompi_cid_lock); - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if(regcom->cid == cid) { - opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom); - OBJ_RELEASE(regcom); - break; - } - } - - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - - return OMPI_SUCCESS; -} - -static uint32_t ompi_comm_lowest_cid (void) -{ - ompi_comm_reg_t *regcom=NULL; - opal_list_item_t *item=opal_list_get_first (&ompi_registered_comms); - - regcom = (ompi_comm_reg_t *)item; - return regcom->cid; -} /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -590,174 +423,43 @@ static uint32_t ompi_comm_lowest_cid (void) * comm.c is, that this file contains the allreduce implementations * which are required, and thus we avoid having duplicate code... */ -int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ) -{ - int ret = 0; - - int ok=0, gok=0; - ompi_comm_cid_allredfct* allredfnct; - - /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - - if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { - /* Initialize the PML stuff in the newcomm */ - if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { - goto bail_on_error; - } - - OMPI_COMM_SET_PML_ADDED(*newcomm); - } - - - ret = (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "activate", 0 ); - if( OMPI_SUCCESS != ret ) { - goto bail_on_error; - } - - - - /** - * Check to see if this process is in the new communicator. - * - * Specifically, this function is invoked by all proceses in the - * old communicator, regardless of whether they are in the new - * communicator or not. This is because it is far simpler to use - * MPI collective functions on the old communicator to determine - * some data for the new communicator (e.g., remote_leader) than - * to kludge up our own pseudo-collective routines over just the - * processes in the new communicator. Hence, *all* processes in - * the old communicator need to invoke this function. - * - * That being said, only processes in the new communicator need to - * select a coll module for the new communicator. More - * specifically, proceses who are not in the new communicator - * should *not* select a coll module -- for example, - * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who - * are not in the new communicator. This can cause errors in the - * selection / initialization of a coll module. Plus, it's - * wasteful -- processes in the new communicator will end up - * freeing the new communicator anyway, so we might as well leave - * the coll selection as NULL (the coll base comm unselect code - * handles that case properly). - */ - if (MPI_UNDEFINED == (*newcomm)->c_local_group->grp_my_rank) { - return OMPI_SUCCESS; - } - - /* Let the collectives components fight over who will do - collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*newcomm))) { - goto bail_on_error; - } - - /* For an inter communicator, we have to deal with the potential - * problem of what is happening if the local_comm that we created - * has a lower CID than the parent comm. This is not a problem - * as long as the user calls MPI_Comm_free on the inter communicator. - * However, if the communicators are not freed by the user but released - * by Open MPI in MPI_Finalize, we walk through the list of still available - * communicators and free them one by one. Thus, local_comm is freed before - * the actual inter-communicator. However, the local_comm pointer in the - * inter communicator will still contain the 'previous' address of the local_comm - * and thus this will lead to a segmentation violation. In order to prevent - * that from happening, we increase the reference counter local_comm - * by one if its CID is lower than the parent. We cannot increase however - * its reference counter if the CID of local_comm is larger than - * the CID of the inter communicators, since a regular MPI_Comm_free would - * leave in that the case the local_comm hanging around and thus we would not - * recycle CID's properly, which was the reason and the cause for this trouble. - */ - if ( OMPI_COMM_IS_INTER(*newcomm)) { - if ( OMPI_COMM_CID_IS_LOWER(*newcomm, comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*newcomm); - OBJ_RETAIN (*newcomm); - } - } - - - return OMPI_SUCCESS; - - bail_on_error: - OBJ_RELEASE(*newcomm); - *newcomm = MPI_COMM_NULL; - return ret; -} /* Non-blocking version of ompi_comm_activate */ -struct ompi_comm_activate_nb_context { - ompi_communicator_t **newcomm; - ompi_communicator_t *comm; - - /* storage for activate barrier */ - int ok; -}; - static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); -int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req) { - struct ompi_comm_activate_nb_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; + int local_leader; + int remote_leader; int ret = 0; - request = ompi_comm_request_get (); - if (NULL == request) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", + send_first, mode); if (NULL == context) { - ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; + /* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */ + context->newcommp = newcomm; - request->context = context; - - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { OBJ_RELEASE(newcomm); + OBJ_RELEASE(context); *newcomm = MPI_COMM_NULL; return ret; } @@ -765,16 +467,10 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, } /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - if (mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } - + * send messages over the new communicator + */ + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MAX, context, + &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); return ret; @@ -788,10 +484,28 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, return OMPI_SUCCESS; } +int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { - struct ompi_comm_activate_nb_context *context = - (struct ompi_comm_activate_nb_context *) request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; int ret; /** @@ -818,15 +532,15 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * the coll selection as NULL (the coll base comm unselect code * handles that case properly). */ - if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) { + if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) { return OMPI_SUCCESS; } /* Let the collectives components fight over who will do collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) { - OBJ_RELEASE(*context->newcomm); - *context->newcomm = MPI_COMM_NULL; + if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) { + OBJ_RELEASE(context->newcomm); + *context->newcommp = MPI_COMM_NULL; return ret; } @@ -847,10 +561,10 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * leave in that the case the local_comm hanging around and thus we would not * recycle CID's properly, which was the reason and the cause for this trouble. */ - if (OMPI_COMM_IS_INTER(*context->newcomm)) { - if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm); - OBJ_RETAIN (*context->newcomm); + if (OMPI_COMM_IS_INTER(context->newcomm)) { + if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) { + OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm); + OBJ_RETAIN (context->newcomm); } } @@ -861,207 +575,47 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *context, ompi_request_t **req) { - return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm, - comm->c_coll.coll_allreduce_module ); -} + ompi_communicator_t *comm = context->comm; -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - ompi_request_t **req) -{ return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm, req, comm->c_coll.coll_iallreduce_module); } - -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) -{ - int local_rank, rsize; - int rc; - int *sbuf; - int *tmpbuf=NULL; - int *rcounts=NULL, scount=0; - int *rdisps=NULL; - - if ( !OMPI_COMM_IS_INTER (intercomm)) { - return MPI_ERR_COMM; - } - - /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); - local_rank = ompi_comm_rank ( intercomm ); - - tmpbuf = (int *) malloc ( count * sizeof(int)); - rdisps = (int *) calloc ( rsize, sizeof(int)); - rcounts = (int *) calloc ( rsize, sizeof(int) ); - if ( OPAL_UNLIKELY (NULL == tmpbuf || NULL == rdisps || NULL == rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /* Execute the inter-allreduce: the result of our group will - be in the buffer of the remote group */ - rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, intercomm, - intercomm->c_coll.coll_allreduce_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if ( 0 == local_rank ) { - MPI_Request req; - - /* for the allgatherv later */ - scount = count; - - /* local leader exchange their data and determine the overall result - for both groups */ - rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - intercomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, - intercomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait ( &req, MPI_STATUS_IGNORE ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); - } - - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - rcounts[0] = count; - sbuf = outbuf; - rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, - rcounts, rdisps, MPI_INT, - intercomm, - intercomm->c_coll.coll_allgatherv_module); - - exit: - if ( NULL != tmpbuf ) { - free ( tmpbuf ); - } - if ( NULL != rcounts ) { - free ( rcounts ); - } - if ( NULL != rdisps ) { - free ( rdisps ); - } - - return (rc); -} - /* Non-blocking version of ompi_comm_allreduce_inter */ -struct ompi_comm_allreduce_inter_context { - int *inbuf; - int *outbuf; - int count; - struct ompi_op_t *op; - ompi_communicator_t *intercomm; - ompi_communicator_t *bridgecomm; - int *tmpbuf; - int *rcounts; - int *rdisps; -}; - -static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context) -{ - if (context->tmpbuf) { - free (context->tmpbuf); - } - - if (context->rdisps) { - free (context->rdisps); - } - - if (context->rcounts) { - free (context->rcounts); - } - - free (context); -} - static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request); -/* Arguments not used in this implementation: - * - bridgecomm - */ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, + ompi_comm_cid_context_t *cid_context, ompi_request_t **req) { - struct ompi_comm_allreduce_inter_context *context = NULL; - ompi_comm_request_t *request = NULL; + ompi_communicator_t *intercomm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; ompi_request_t *subreq; int local_rank, rsize, rc; - if (!OMPI_COMM_IS_INTER (intercomm)) { + if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; } request = ompi_comm_request_get (); - if (NULL == request) { + if (OPAL_UNLIKELY(NULL == request)) { return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); - if (NULL == context) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - context->inbuf = inbuf; - context->outbuf = outbuf; - context->count = count; - context->op = op; - context->intercomm = intercomm; - context->bridgecomm = bridgecomm; + request->context = &context->super; /* Allocate temporary arrays */ rsize = ompi_comm_remote_size (intercomm); @@ -1071,18 +625,17 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, context->rdisps = (int *) calloc (rsize, sizeof(int)); context->rcounts = (int *) calloc (rsize, sizeof(int)); if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - request->context = context; - /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, &subreq, intercomm->c_coll.coll_iallreduce_module); - if (OMPI_SUCCESS != rc) { - goto exit; + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + ompi_comm_request_return (request); + return rc; } if (0 == local_rank) { @@ -1094,58 +647,37 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, ompi_comm_request_start (request); *req = &request->super; -exit: - if (OMPI_SUCCESS != rc) { - if (context) { - ompi_comm_allreduce_inter_context_free (context); - } - - if (request) { - request->context = NULL; - ompi_comm_request_return (request); - } - } - - return rc; + return OMPI_SUCCESS; } static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreqs[2]; int rc; /* local leader exchange their data and determine the overall result for both groups */ rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - context->intercomm, subreqs)); - if ( OMPI_SUCCESS != rc ) { - goto exit; + intercomm, subreqs)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); - -exit: - if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; + MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - return rc; + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); } static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); @@ -1155,8 +687,8 @@ static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreq; int scount = 0, rc; @@ -1167,245 +699,369 @@ static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) the result first. */ - if (0 != ompi_comm_rank (context->intercomm)) { + if (0 != ompi_comm_rank (intercomm)) { context->rcounts[0] = context->count; } else { scount = context->count; } - rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, - context->intercomm, &subreq, - context->intercomm->c_coll.coll_iallgatherv_module); + rc = intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, + context->rcounts, context->rdisps, MPI_INT, intercomm, + &subreq, intercomm->c_coll.coll_iallgatherv_module); if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; return rc; } - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1); + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); +} - return OMPI_SUCCESS; +static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *comm = context->cid_context->comm; + ompi_request_t *subreq; + int rc; + + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, + context->cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ibcast_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); } -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request) +static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request) { - /* free this request's context */ - ompi_comm_allreduce_inter_context_free (request->context); - /* prevent a double-free from the progress engine */ - request->context = NULL; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; - /* done */ - return OMPI_SUCCESS; + /* step 3: reduce leader data */ + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); + + /* schedule the broadcast to local peers */ + return ompi_comm_allreduce_bridged_schedule_bcast (request); } -/* Arguments not used in this implementation: - * - send_first - */ -static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bcomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm; + ompi_request_t *subreq[2]; + int rc; + + /* step 2: leader exchange */ + rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm, + subreq)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2); +} + +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - int *tmpbuf=NULL; - int local_rank; + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; int i; int rc; - int local_leader, remote_leader; - local_leader = (*((int*)lleader)); - remote_leader = (*((int*)rleader)); + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (local_rank == cid_context->local_leader) { + context->tmpbuf = (int *) calloc (count, sizeof (int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; + request = ompi_comm_request_get (); + if (OPAL_UNLIKELY(NULL == request)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + request->context = &context->super; + + if (cid_context->local_leader == local_rank) { + memcpy (context->tmpbuf, inbuf, count * sizeof (int)); } - /* Intercomm_create */ - rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, comm, comm->c_coll.coll_allreduce_module ); + /* step 1: reduce to the local leader */ + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, &subreq, + comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - MPI_Request req; + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - bcomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, bcomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait( &req, MPI_STATUS_IGNORE); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; } - rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, - comm, comm->c_coll.coll_bcast_module ); + ompi_comm_request_start (request); - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); - } + *req = &request->super; - return (rc); + return OMPI_SUCCESS; } -/* Arguments not used in this implementation: - * - bridgecomm - * - * lleader is the local rank of root in comm - * rleader is the port_string - */ -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request) { - int *tmpbuf=NULL; - int rc; - int local_leader, local_rank; - char *port_string; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int32_t size_count = context->count; opal_value_t info; opal_pmix_pdata_t pdat; opal_buffer_t sbuf; - int32_t size_count; + int rc; + + fprintf (stderr, "reduce complete\n"); + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + + if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { + OBJ_DESTRUCT(&sbuf); + fprintf (stderr, "pack failed. rc %d\n", rc); + return rc; + } + + OBJ_CONSTRUCT(&info, opal_value_t); + OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + + info.type = OPAL_BYTE_OBJECT; + pdat.value.type = OPAL_BYTE_OBJECT; + + opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); + OBJ_DESTRUCT(&sbuf); + + if (cid_context->send_first) { + (void)asprintf(&info.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } else { + (void)asprintf(&info.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } + + fprintf (stderr, "%s, %s\n", info.key, pdat.value.key); + + /* this macro is not actually non-blocking. if a non-blocking version becomes available this function + * needs to be reworked to take advantage of it. */ + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OBJ_DESTRUCT(&info); + fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc); + if (OPAL_SUCCESS != rc) { + OBJ_DESTRUCT(&pdat); + return rc; + } + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); + pdat.value.data.bo.bytes = NULL; + pdat.value.data.bo.size = 0; + OBJ_DESTRUCT(&pdat); + + rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT); + OBJ_DESTRUCT(&sbuf); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + return rc; + } - local_leader = (*((int*)lleader)); - port_string = (char*)rleader; - size_count = count; + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT); - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + return ompi_comm_allreduce_bridged_schedule_bcast (request); +} + +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) +{ + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; + int rc; + + fprintf (stderr, "Calling ompi_comm_allreduce_intra_pmix_nb. rank %d\n", local_rank); + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (cid_context->local_leader == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + /* comm is an intra-communicator */ - rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm, - comm->c_coll.coll_allreduce_module); + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, tmpbuf, (int32_t)count, OPAL_INT))) { - goto exit; - } - OBJ_CONSTRUCT(&info, opal_value_t); - OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - info.type = OPAL_BYTE_OBJECT; - pdat.value.type = OPAL_BYTE_OBJECT; + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; + } - opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); - OBJ_DESTRUCT(&sbuf); + ompi_comm_request_start (request); + *req = (ompi_request_t *) request; - if (send_first) { - (void)asprintf(&info.key, "%s:%s:send:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", port_string, tag, iter); - } else { - (void)asprintf(&info.key, "%s:%s:recv:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:send:%d", port_string, tag, iter); - } + /* use the same function as bridged to schedule the broadcast */ + return OMPI_SUCCESS; +} - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); - OBJ_DESTRUCT(&info); +static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + ompi_request_t *subreq[2]; + int subreq_count = 0; + int rc; - if (OPAL_SUCCESS != rc) { - OBJ_DESTRUCT(&pdat); - goto exit; + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq + subreq_count++)); + if (OMPI_SUCCESS != rc) { + return rc; + } } - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); - pdat.value.data.bo.bytes = NULL; - pdat.value.data.bo.size = 0; - OBJ_DESTRUCT(&pdat); + } + + return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count); +} - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&sbuf, outbuf, &size_count, OPAL_INT))) { - OBJ_DESTRUCT(&sbuf); - goto exit; +static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int *tmp = context->tmpbuf; + ompi_request_t *subreq[2]; + int rc; + + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT); + tmp += context->count; } - OBJ_DESTRUCT(&sbuf); - count = (int)size_count; - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); } - rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, - local_leader, comm, - comm->c_coll.coll_bcast_module); + if (MPI_PROC_NULL != context->peers_comm[0]) { + /* interior node */ + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq)); + if (OMPI_SUCCESS != rc) { + return rc; + } + + rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, cid_context->comm, subreq + 1)); + if (OMPI_SUCCESS != rc) { + return rc; + } - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2); } - return (rc); + /* root */ + return ompi_comm_allreduce_group_broadcast (request); } -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *newcomm, - void* local_leader, - void* remote_leader, - int send_first, char *intag, int iter) +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - ompi_group_t *group = newcomm->c_local_group; - int peers_group[3], peers_comm[3]; + ompi_group_t *group = cid_context->newcomm->c_local_group; const int group_size = ompi_group_size (group); const int group_rank = ompi_group_rank (group); - int tag = *((int *) local_leader); - int *tmp1; - int i, rc=OMPI_SUCCESS; + ompi_communicator_t *comm = cid_context->comm; + int peers_group[3], *tmp, subreq_count = 0; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; + ompi_request_t *subreq[3]; + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (NULL == context) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + tmp = context->tmpbuf = calloc (sizeof (int), count * 3); + if (NULL == context->tmpbuf) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; /* basic recursive doubling allreduce on the group */ peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL; @@ -1413,54 +1069,28 @@ static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL; /* translate the ranks into the ranks of the parent communicator */ - ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm); - - tmp1 = malloc (sizeof (int) * count); + ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm); /* reduce */ memmove (outbuf, inbuf, sizeof (int) * count); - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm, - MPI_STATUS_IGNORE)); + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1], + cid_context->pml_tag, comm, subreq + subreq_count++)); if (OMPI_SUCCESS != rc) { - goto out; + ompi_comm_request_return (request); + return rc; } - /* this is integer reduction so we do not care about ordering */ - ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT); - } - } - - if (MPI_PROC_NULL != peers_comm[0]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0], - tag, MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0], - tag, comm, MPI_STATUS_IGNORE)); - if (OMPI_SUCCESS != rc) { - goto out; + tmp += count; } } - /* broadcast */ - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag, - MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - } - } + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count); - out: - free (tmp1); + ompi_comm_request_start (request); + *req = &request->super; - return rc; + return OMPI_SUCCESS; } - -END_C_DECLS diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index f2acab5bbec..f453ca1e8e1 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -206,10 +206,6 @@ int ompi_comm_init(void) OBJ_RETAIN(&ompi_mpi_group_null.group); OBJ_RETAIN(&ompi_mpi_errors_are_fatal.eh); - /* initialize the comm_reg stuff for multi-threaded comm_cid - allocation */ - ompi_comm_reg_init(); - /* initialize communicator requests (for ompi_comm_idup) */ ompi_comm_request_init (); @@ -328,13 +324,9 @@ int ompi_comm_finalize(void) } } - OBJ_DESTRUCT (&ompi_mpi_communicators); OBJ_DESTRUCT (&ompi_comm_f_to_c_table); - /* finalize the comm_reg stuff */ - ompi_comm_reg_finalize(); - /* finalize communicator requests */ ompi_comm_request_fini (); diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c index 496de0319aa..f7372e27452 100644 --- a/ompi/communicator/comm_request.c +++ b/ompi/communicator/comm_request.c @@ -234,6 +234,7 @@ static void ompi_comm_request_destruct (ompi_comm_request_t *request) { OBJ_DESTRUCT(&request->schedule); } + OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t, ompi_comm_request_construct, ompi_comm_request_destruct); @@ -257,10 +258,10 @@ ompi_comm_request_t *ompi_comm_request_get (void) void ompi_comm_request_return (ompi_comm_request_t *request) { if (request->context) { - free (request->context); - request->context = NULL; + OBJ_RELEASE (request->context); } + OMPI_REQUEST_FINI(&request->super); opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request); } diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h index 246a3010b0e..65af613f95f 100644 --- a/ompi/communicator/comm_request.h +++ b/ompi/communicator/comm_request.h @@ -22,7 +22,7 @@ typedef struct ompi_comm_request_t { ompi_request_t super; - void *context; + opal_object_t *context; opal_list_t schedule; } ompi_comm_request_t; OBJ_CLASS_DECLARATION(ompi_comm_request_t); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 455defb9cc5..a8d4756cffa 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -492,24 +492,27 @@ ompi_communicator_t* ompi_comm_allocate (int local_group_size, * @param mode: combination of input * OMPI_COMM_CID_INTRA: intra-comm * OMPI_COMM_CID_INTER: inter-comm + * OMPI_COMM_CID_GROUP: only decide CID within the ompi_group_t + * associated with the communicator. arg0 + * must point to an int which will be used + * as the pml tag for communication. * OMPI_COMM_CID_INTRA_BRIDGE: 2 intracomms connected by - * a bridge comm. local_leader - * and remote leader are in this - * case an int (rank in bridge-comm). + * a bridge comm. arg0 and arg1 must point + * to integers representing the local and + * remote leader ranks. the remote leader rank + * is a rank in the bridgecomm. * OMPI_COMM_CID_INTRA_PMIX: 2 intracomms, leaders talk - * through PMIx. lleader and rleader - * are the required contact information. + * through PMIx. arg0 must point to an integer + * representing the local leader rank. arg1 + * must point to a string representing the + * port of the remote leader. * @param send_first: to avoid a potential deadlock for * the OOB version. * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* oldcomm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first); +OMPI_DECLSPEC int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode); /** * allocate new communicator ID (non-blocking) @@ -521,10 +524,9 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, * OMPI_COMM_CID_INTER: inter-comm * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req); +OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req); /** * shut down the communicator infrastructure. @@ -617,18 +619,25 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high ); -OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ); +OMPI_DECLSPEC int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode); -OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req); +/** + * Non-blocking variant of comm_activate. + * + * @param[inout] newcomm New communicator + * @param[in] comm Parent communicator + * @param[in] bridgecomm Bridge communicator (used for PMIX and bridge modes) + * @param[in] arg0 Mode argument 0 + * @param[in] arg1 Mode argument 1 + * @param[in] send_first Send first from this process (PMIX mode only) + * @param[in] mode Collective mode + * @param[out] req New request object to track this operation + */ +OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req); /** * a simple function to dump the structure @@ -638,14 +647,6 @@ int ompi_comm_dump ( ompi_communicator_t *comm ); /* setting name */ int ompi_comm_set_name (ompi_communicator_t *comm, const char *name ); -/* - * these are the init and finalize functions for the comm_reg - * stuff. These routines are necessary for handling multi-threading - * scenarious in the communicator_cid allocation - */ -void ompi_comm_reg_init(void); -void ompi_comm_reg_finalize(void); - /* global variable to save the number od dynamic communicators */ extern int ompi_comm_num_dyncomm; diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index 2da39d920a4..7619e8a219f 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -469,25 +469,25 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, new_group_pointer = MPI_GROUP_NULL; /* allocate comm_cid */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } /* activate comm and init coll-component */ - rc = ompi_comm_activate ( &newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_activate ( &newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } @@ -500,7 +500,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, exit: if (OMPI_SUCCESS != rc) { if (MPI_COMM_NULL != newcomp && NULL != newcomp) { - OBJ_RETAIN(newcomp); + OBJ_RELEASE(newcomp); newcomp = MPI_COMM_NULL; } } diff --git a/ompi/mpi/c/intercomm_create.c b/ompi/mpi/c/intercomm_create.c index b7e948cd624..10f0e8ad2ca 100644 --- a/ompi/mpi/c/intercomm_create.c +++ b/ompi/mpi/c/intercomm_create.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -196,26 +199,15 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader, new_group_pointer = MPI_GROUP_NULL; /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ - + rc = ompi_comm_nextcid (newcomp, local_comm, bridge_comm, &lleader, + &rleader, false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } /* activate comm and init coll-module */ - rc = ompi_comm_activate ( &newcomp, - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, local_comm, bridge_comm, &lleader, &rleader, + false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } diff --git a/ompi/mpi/c/intercomm_merge.c b/ompi/mpi/c/intercomm_merge.c index bcf8f9fec73..55258b637ef 100644 --- a/ompi/mpi/c/intercomm_merge.c +++ b/ompi/mpi/c/intercomm_merge.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -115,26 +118,16 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high, OBJ_RELEASE(new_group_pointer); new_group_pointer = MPI_GROUP_NULL; - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + /* Determine context id */ + rc = ompi_comm_nextcid (newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; }