Skip to content

Commit

Permalink
Reduce the amount of temporary memory needed for MPI_Alltoallw.
Browse files Browse the repository at this point in the history
Dont copy the datatype into a buffer with the same extent, but instead
pack it and send it to the peer as packed.

Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
  • Loading branch information
bosilca committed Aug 31, 2021
1 parent 74049fc commit 447b289
Showing 1 changed file with 28 additions and 25 deletions.
53 changes: 28 additions & 25 deletions ompi/mca/coll/basic/coll_basic_alltoallw.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "opal/datatype/opal_convertor_internal.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"
Expand All @@ -42,12 +43,11 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, j, size, rank, err = MPI_SUCCESS, max_size;
int i, j, size, rank, err = MPI_SUCCESS;
ompi_request_t *req;
char *save_buffer = NULL;
ptrdiff_t ext, gap = 0;

/* Initialize. */
size_t max_size = 0, packed_size;
opal_convertor_t convertor;

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
Expand All @@ -57,11 +57,14 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con
return MPI_SUCCESS;
}

/* Find the largest receive amount */
/* Find the largest amount of packed send/recv data */
for (i = 0, max_size = 0 ; i < size ; ++i) {
ext = opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap);
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);

max_size = ext > max_size ? ext : max_size;
packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
packed_size *= rcounts[i];
max_size = packed_size > max_size ? packed_size : max_size;
}

/* Allocate a temporary buffer */
Expand All @@ -77,45 +80,45 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con
msg_size_i *= rcounts[i];
for (j = i+1 ; j < size ; ++j) {
size_t msg_size_j;
struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size};
uint32_t iov_count = 1;
ompi_datatype_type_size(rdtypes[j], &msg_size_j);
msg_size_j *= rcounts[j];

/* Initiate all send/recv to/from others. */
if (i == rank && msg_size_j != 0) {
char * tmp_buffer;
/* Shift the temporary buffer according to the current datatype */
(void)opal_datatype_span(&rdtypes[j]->super, rcounts[j], &gap);
tmp_buffer = save_buffer - gap;
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtypes[j], rcounts[j],
tmp_buffer, (char *) rbuf + rdisps[j]);
if (MPI_SUCCESS != err) { goto error_hndl; }
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j);
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j],
(char *) rbuf + rdisps[j]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j],
j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = MCA_PML_CALL(send ((void *) tmp_buffer, rcounts[j], rdtypes[j],
err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }
} else if (j == rank && msg_size_i != 0) {
char * tmp_buffer;
/* Shift the temporary buffer according to the current datatype */
(void)opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap);
tmp_buffer = save_buffer - gap;
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtypes[i], rcounts[i],
tmp_buffer, (char *) rbuf + rdisps[i]);
if (MPI_SUCCESS != err) { goto error_hndl; }
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i],
(char *) rbuf + rdisps[i]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i],
i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = MCA_PML_CALL(send ((void *) tmp_buffer, rcounts[i], rdtypes[i],
err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }
Expand Down

0 comments on commit 447b289

Please sign in to comment.