Skip to content

Commit

Permalink
Merge pull request #9493 from bosilca/topic/a2a_inplace5.0
Browse files Browse the repository at this point in the history
Topic/a2a inplace5.0
  • Loading branch information
awlauria authored Oct 14, 2021
2 parents afed069 + 531735c commit f22310c
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 211 deletions.
142 changes: 83 additions & 59 deletions ompi/mca/coll/base/coll_base_alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2017 The University of Tennessee and The University
* Copyright (c) 2004-2021 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
Expand Down Expand Up @@ -35,88 +35,112 @@
#include "coll_base_topo.h"
#include "coll_base_util.h"

/* MPI_IN_PLACE all to all algorithm. TODO: implement a better one. */
/*
* We want to minimize the amount of temporary memory needed while allowing as many ranks
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
* single step a process echange the data with both neighbors at distance k (on the left
* and the right on a logical ring topology). With this approach we need to pack the data
* for a single of the two neighbors, as we can then use the original buffer (and datatype
* and count) to send the data to the other.
*/
int
mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, j, size, rank, err = MPI_SUCCESS, line;
ptrdiff_t ext, gap = 0;
int i, size, rank, left, right, err = MPI_SUCCESS, line;
ptrdiff_t extent;
ompi_request_t *req;
char *allocated_buffer = NULL, *tmp_buffer;
size_t max_size;
char *tmp_buffer;
size_t packed_size = 0, max_size;
opal_convertor_t convertor;

/* Initialize. */

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* If only one process, we're done. */
if (1 == size) {
ompi_datatype_type_size(rdtype, &max_size);

/* Easy way out */
if ((1 == size) || (0 == rcount) || (0 == max_size) ) {
return MPI_SUCCESS;
}

/* Find the largest receive amount */
ompi_datatype_type_extent (rdtype, &ext);
max_size = opal_datatype_span(&rdtype->super, rcount, &gap);
/* Find the largest amount of packed send/recv data among all peers where
* we need to pack before the send.
*/
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
for (i = 1 ; i <= (size >> 1) ; ++i) {
right = (rank + i) % size;
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);

if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) {
packed_size = opal_datatype_compute_remote_size(&rdtype->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
max_size = packed_size > max_size ? packed_size : max_size;
}
}
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
max_size *= rcount;

/* Initiate all send/recv to/from others. */
ompi_datatype_type_extent(rdtype, &extent);

/* Allocate a temporary buffer */
allocated_buffer = calloc (max_size, 1);
if( NULL == allocated_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
tmp_buffer = allocated_buffer - gap;
max_size = ext * rcount;

/* in-place alltoall slow algorithm (but works) */
for (i = 0 ; i < size ; ++i) {
for (j = i+1 ; j < size ; ++j) {
if (i == rank) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
(char *) rbuf + j * max_size);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * j, rcount, rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
} else if (j == rank) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
(char *) rbuf + i * max_size);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * i, rcount, rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
} else {
continue;
}

/* Wait for the requests to complete */
err = ompi_request_wait ( &req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
tmp_buffer = calloc (max_size, 1);
if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }

for (i = 1 ; i <= (size >> 1) ; ++i) {
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
uint32_t iov_count = 1;

right = (rank + i) % size;
left = (rank + size - i) % size;

ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcount,
(char *) rbuf + right * extent);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Receive data from the right */
err = MCA_PML_CALL(irecv ((char *) rbuf + right * extent, rcount, rdtype,
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }

if( left != right ) {
/* Send data to the left */
err = MCA_PML_CALL(send ((char *) rbuf + left * extent, rcount, rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Receive data from the left */
err = MCA_PML_CALL(irecv ((char *) rbuf + left * extent, rcount, rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

/* Send data to the right */
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}

error_hndl:
/* Free the temporary buffer */
if( NULL != allocated_buffer )
free (allocated_buffer);
if( NULL != tmp_buffer )
free (tmp_buffer);

if( MPI_SUCCESS != err ) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
Expand Down
150 changes: 93 additions & 57 deletions ompi/mca/coll/base/coll_base_alltoallv.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2017 The University of Tennessee and The University
* Copyright (c) 2004-2021 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
Expand Down Expand Up @@ -37,85 +37,121 @@
#include "coll_base_topo.h"
#include "coll_base_util.h"

/*
* We want to minimize the amount of temporary memory needed while allowing as many ranks
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
* single step a process echange the data with both neighbors at distance k (on the left
* and the right on a logical ring topology). With this approach we need to pack the data
* for a single of the two neighbors, as we can then use the original buffer (and datatype
* and count) to send the data to the other.
*/
int
mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, j, size, rank, err=MPI_SUCCESS;
char *allocated_buffer, *tmp_buffer;
size_t max_size;
ptrdiff_t ext, gap = 0;
int i, size, rank, left, right, err = MPI_SUCCESS, line;
ompi_request_t *req;
char *tmp_buffer;
size_t packed_size = 0, max_size;
opal_convertor_t convertor;

/* Initialize. */

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* If only one process, we're done. */
if (1 == size) {
ompi_datatype_type_size(rdtype, &max_size);
max_size *= rcounts[rank];

/* Easy way out */
if ((1 == size) || (0 == max_size) ) {
return MPI_SUCCESS;
}
/* Find the largest receive amount */
ompi_datatype_type_extent (rdtype, &ext);
for (i = 0, max_size = 0 ; i < size ; ++i) {
if (i == rank) {
continue;
}
size_t cur_size = opal_datatype_span(&rdtype->super, rcounts[i], &gap);
max_size = cur_size > max_size ? cur_size : max_size;
}
/* The gap will always be the same as we are working on the same datatype */

if (OPAL_UNLIKELY(0 == max_size)) {
return MPI_SUCCESS;
/* Find the largest amount of packed send/recv data among all peers where
* we need to pack before the send.
*/
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
for (i = 1 ; i <= (size >> 1) ; ++i) {
right = (rank + i) % size;
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);

if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) {
packed_size = opal_datatype_compute_remote_size(&rdtype->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
packed_size *= rcounts[right];
max_size = packed_size > max_size ? packed_size : max_size;
}
}
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */

/* Allocate a temporary buffer */
allocated_buffer = calloc (max_size, 1);
if (NULL == allocated_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
tmp_buffer = allocated_buffer - gap;

/* Initiate all send/recv to/from others. */
/* in-place alltoallv slow algorithm (but works) */
for (i = 0 ; i < size ; ++i) {
for (j = i+1 ; j < size ; ++j) {
if (i == rank && 0 != rcounts[j]) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j],
tmp_buffer, (char *) rbuf + rdisps[j] * ext);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Exchange data with the peer */
err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[j], rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALLV,
(char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALLV,
comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
} else if (j == rank && 0 != rcounts[i]) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i],
tmp_buffer, (char *) rbuf + rdisps[i] * ext);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Exchange data with the peer */
err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[i], rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALLV,
(char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALLV,
comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}
tmp_buffer = calloc (max_size, 1);
if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }

for (i = 1 ; i <= (size >> 1) ; ++i) {
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
uint32_t iov_count = 1;

right = (rank + i) % size;
left = (rank + size - i) % size;

if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcounts[right],
(char *) rbuf + rdisps[right]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Receive data from the right */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtype,
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

if( (left != right) && (0 != rcounts[left]) ) {
/* Send data to the left */
err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Receive data from the left */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */
/* Send data to the right */
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}
}

error_hndl:
/* Free the temporary buffer */
free (allocated_buffer);
if( NULL != tmp_buffer )
free (tmp_buffer);

if( MPI_SUCCESS != err ) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
rank));
(void)line; // silence compiler warning
}

/* All done */
return err;
Expand Down
Loading

0 comments on commit f22310c

Please sign in to comment.