Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate MPI errors from underlying operations #10

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/collective/allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string.h>
#include <math.h>
#include "utils.h"
#include "error.h"


int MPIX_Allgather(const void* sendbuf,
Expand Down Expand Up @@ -466,25 +467,27 @@ int allgather_hier_bruck(const void *sendbuf, int sendcount, MPI_Datatype sendty
void *recvbuf, int recvcount, MPI_Datatype recvtype, MPIX_Comm* comm)
{
int num_procs;
MPI_Comm_size(comm->global_comm, &num_procs);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm->global_comm, &num_procs));

int recv_size;
MPI_Type_size(recvtype, &recv_size);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Type_size(recvtype, &recv_size));

int local_rank, PPN;
MPI_Comm_rank(comm->local_comm, &local_rank);
MPI_Comm_size(comm->local_comm, &PPN);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_rank(comm->local_comm, &local_rank));
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm->local_comm, &PPN));

char* tmpbuf = (char*)malloc(recvcount*num_procs*recv_size*sizeof(char));

gather(sendbuf, sendcount, sendtype, tmpbuf, recvcount, recvtype, 0, comm->local_comm);
MPI_ADVANCE_SUCCESS_OR_RETURN(gather(sendbuf, sendcount, sendtype, tmpbuf, recvcount, recvtype, 0, comm->local_comm));
if (local_rank == 0)
{
allgather_bruck(tmpbuf, recvcount*PPN, recvtype, recvbuf, recvcount*PPN, recvtype, comm->group_comm);
MPI_ADVANCE_SUCCESS_OR_RETURN(allgather_bruck(tmpbuf, recvcount*PPN, recvtype, recvbuf, recvcount*PPN, recvtype, comm->group_comm));
}
bcast(recvbuf, recvcount*num_procs, recvtype, 0, comm->local_comm);
MPI_ADVANCE_SUCCESS_OR_RETURN(bcast(recvbuf, recvcount*num_procs, recvtype, 0, comm->local_comm));

free(tmpbuf);

return MPI_SUCCESS;
}


Expand All @@ -494,23 +497,23 @@ int allgather_mult_hier_bruck(const void *sendbuf, int sendcount, MPI_Datatype s
void *recvbuf, int recvcount, MPI_Datatype recvtype, MPIX_Comm* comm)
{
int num_procs;
MPI_Comm_size(comm->global_comm, &num_procs);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm->global_comm, &num_procs));

int recv_size;
MPI_Type_size(recvtype, &recv_size);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Type_size(recvtype, &recv_size));

int group_size;
MPI_Comm_size(comm->group_comm, &group_size);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm->group_comm, &group_size));

int ppn;
MPI_Comm_size(comm->local_comm, &ppn);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm->local_comm, &ppn));

char* tmpbuf = (char*)malloc(recvcount*num_procs*recv_size);
char* recv_buffer = (char*)recvbuf;

allgather_bruck(sendbuf, sendcount, sendtype, tmpbuf, recvcount, recvtype, comm->group_comm);
allgather_bruck(tmpbuf, recvcount*group_size, recvtype,
tmpbuf, recvcount*group_size, recvtype, comm->local_comm);
MPI_ADVANCE_SUCCESS_OR_RETURN(allgather_bruck(sendbuf, sendcount, sendtype, tmpbuf, recvcount, recvtype, comm->group_comm));
MPI_ADVANCE_SUCCESS_OR_RETURN(allgather_bruck(tmpbuf, recvcount*group_size, recvtype,
tmpbuf, recvcount*group_size, recvtype, comm->local_comm));

for (int i = 0; i < ppn; i++)
{
Expand All @@ -528,6 +531,7 @@ int allgather_mult_hier_bruck(const void *sendbuf, int sendcount, MPI_Datatype s
}

free(tmpbuf);
return MPI_SUCCESS;
}


Expand Down
15 changes: 9 additions & 6 deletions src/collective/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <string.h>
#include <math.h>
#include "utils.h"
#include "error.h"

// TODO : Add Locality-Aware Bruck Alltoall Algorithm!
// TODO : Change to PMPI_Alltoall and test with profiling library!
Expand Down Expand Up @@ -64,17 +65,17 @@ int alltoall_pairwise(const void* sendbuf,
MPI_Comm comm)
{
int rank, num_procs;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &num_procs);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_rank(comm, &rank));
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm, &num_procs));

int tag = 102944;
int send_proc, recv_proc;
int send_pos, recv_pos;
MPI_Status status;

int send_size, recv_size;
MPI_Type_size(sendtype, &send_size);
MPI_Type_size(recvtype, &recv_size);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Type_size(sendtype, &send_size));
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Type_size(recvtype, &recv_size));

// Send to rank + i
// Recv from rank - i
Expand All @@ -89,10 +90,12 @@ int alltoall_pairwise(const void* sendbuf,
send_pos = send_proc * sendcount * send_size;
recv_pos = recv_proc * recvcount * recv_size;

MPI_Sendrecv(sendbuf + send_pos, sendcount, sendtype, send_proc, tag,
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Sendrecv(sendbuf + send_pos, sendcount, sendtype, send_proc, tag,
recvbuf + recv_pos, recvcount, recvtype, recv_proc, tag,
comm, &status);
comm, &status));
}

return MPI_SUCCESS;
}

int alltoall_bruck(const void* sendbuf,
Expand Down
10 changes: 6 additions & 4 deletions src/collective/bcast.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "bcast.h"
#include <math.h>
#include "error.h"

// TODO : currently root is always 0
int bcast(void* buffer,
Expand All @@ -9,8 +10,8 @@ int bcast(void* buffer,
MPI_Comm comm)
{
int rank, num_procs;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &num_procs);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_rank(comm, &rank));
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm, &num_procs));

int num_steps = log2(num_procs);
int tag = 204857;
Expand All @@ -22,14 +23,15 @@ int bcast(void* buffer,
if (rank % (stride*2) == 0)
{
// Sending Proc
MPI_Send(buffer, count, datatype, rank + stride, tag, comm);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Send(buffer, count, datatype, rank + stride, tag, comm));
}
else if (rank % stride == 0)
{
// Recving Proc
MPI_Recv(buffer, count, datatype, rank - stride, tag, comm, &status);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Recv(buffer, count, datatype, rank - stride, tag, comm, &status));
}

stride /= 2;
}
return MPI_SUCCESS;
}
15 changes: 9 additions & 6 deletions src/collective/gather.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "gather.h"
#include <string.h>
#include <math.h>
#include "error.h"


// TODO : Currently root is always 0
int gather(const void* sendbuf,
Expand All @@ -13,11 +15,11 @@ int gather(const void* sendbuf,
MPI_Comm comm)
{
int rank, num_procs;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &num_procs);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_rank(comm, &rank));
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Comm_size(comm, &num_procs));

int recv_size;
MPI_Type_size(recvtype, &recv_size);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Type_size(recvtype, &recv_size));

int num_steps = log2(num_procs);
int tag = 204857;
Expand All @@ -32,17 +34,18 @@ int gather(const void* sendbuf,
if (rank % (stride*2))
{
// Sending Proc
MPI_Send(recvbuf, recvcount*stride, recvtype, rank - stride, tag, comm);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Send(recvbuf, recvcount*stride, recvtype, rank - stride, tag, comm));
break;
}
else
{
// Recving Proc
MPI_Recv(&(recv_buffer[recvcount*stride*recv_size]), recvcount*stride, recvtype,
rank + stride, tag, comm, &status);
MPI_ADVANCE_SUCCESS_OR_RETURN(MPI_Recv(&(recv_buffer[recvcount*stride*recv_size]), recvcount*stride, recvtype,
rank + stride, tag, comm, &status));
}

stride *= 2;
}
return MPI_SUCCESS;
}

6 changes: 6 additions & 0 deletions src/error.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#pragma once

#include <mpi.h>

#define MPI_ADVANCE_SUCCESS_OR_RETURN(_code) \
{if (MPI_SUCCESS != _code) return _code; }