Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct error when a parallel application writes different amounts of da... #4

Merged
merged 1 commit into from
Aug 19, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/nc4internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ typedef struct NC_HDF5_FILE_INFO
{
NC* controller;
hid_t hdfid;
#ifdef USE_PARALLEL
MPI_Comm comm; /* Copy of MPI Communicator used to open the file */
MPI_Info info; /* Copy of MPI Information Object used to open the file */
#endif
int flags;
int cmode;
int nvars;
Expand Down
3 changes: 2 additions & 1 deletion include/netcdf.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,9 @@ by the desired type. */
#define NC_ENOTBUILT (-128) /**< Attempt to use feature that was not turned on when netCDF was built. */
#define NC_EDISKLESS (-129) /**< Error in using diskless access. */
#define NC_ECANTEXTEND (-130) /**< Attempt to extend dataset during ind. I/O operation. */
#define NC_EMPI (-131) /**< MPI operation failed. */

#define NC4_LAST_ERROR (-130)
#define NC4_LAST_ERROR (-131)

/* This is used in netCDF-4 files for dimensions without coordinate
* vars. */
Expand Down
66 changes: 59 additions & 7 deletions libsrc4/nc4file.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info,
FILE *fp;
int retval = NC_NOERR;
NC_HDF5_FILE_INFO_T* nc4_info = NULL;
#ifndef USE_PARALLEL
#ifdef USE_PARALLEL
int comm_duped = 0; /* Whether the MPI Communicator was duplicated */
int info_duped = 0; /* Whether the MPI Info object was duplicated */
#else /* !USE_PARALLEL */
int persist = 0; /* Should diskless try to persist its data into file?*/
#endif

Expand Down Expand Up @@ -294,6 +297,22 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info,
if (H5Pset_fapl_mpiposix(fapl_id, comm, 0) < 0)
BAIL(NC_EPARINIT);
}

/* Keep copies of the MPI Comm & Info objects */
if (MPI_SUCCESS != MPI_Comm_dup(comm, &nc4_info->comm))
BAIL(NC_EMPI);
comm_duped++;
if (MPI_INFO_NULL != info)
{
if (MPI_SUCCESS != MPI_Info_dup(info, &nc4_info->info))
BAIL(NC_EMPI);
info_duped++;
}
else
{
/* No dup, just copy it. */
nc4_info->info = info;
}
}
#else /* only set cache for non-parallel... */
if(cmode & NC_DISKLESS) {
Expand Down Expand Up @@ -353,6 +372,10 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info,
return NC_NOERR;

exit: /*failure exit*/
#ifdef USE_PARALLEL
if (comm_duped) MPI_Comm_free(&nc4_info->comm);
if (info_duped) MPI_Info_free(&nc4_info->info);
#endif
#ifdef EXTRA_TESTS
num_plists--;
#endif
Expand Down Expand Up @@ -2302,6 +2325,10 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm,
H5F_ACC_RDWR : H5F_ACC_RDONLY;
int retval;
NC_HDF5_FILE_INFO_T* nc4_info = NULL;
#ifdef USE_PARALLEL
int comm_duped = 0; /* Whether the MPI Communicator was duplicated */
int info_duped = 0; /* Whether the MPI Info object was duplicated */
#endif /* !USE_PARALLEL */

LOG((3, "nc4_open_file: path %s mode %d", path, mode));
assert(path && nc);
Expand Down Expand Up @@ -2349,6 +2376,22 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm,
if (H5Pset_fapl_mpiposix(fapl_id, comm, 0) < 0)
BAIL(NC_EPARINIT);
}

/* Keep copies of the MPI Comm & Info objects */
if (MPI_SUCCESS != MPI_Comm_dup(comm, &nc4_info->comm))
BAIL(NC_EMPI);
comm_duped++;
if (MPI_INFO_NULL != info)
{
if (MPI_SUCCESS != MPI_Info_dup(info, &nc4_info->info))
BAIL(NC_EMPI);
info_duped++;
}
else
{
/* No dup, just copy it. */
nc4_info->info = info;
}
}
#else /* only set cache for non-parallel. */
if (H5Pset_cache(fapl_id, 0, nc4_chunk_cache_nelems, nc4_chunk_cache_size,
Expand Down Expand Up @@ -2397,11 +2440,15 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm,

return NC_NOERR;

exit:
if (fapl_id != H5P_DEFAULT) H5Pclose(fapl_id);
exit:
#ifdef USE_PARALLEL
if (comm_duped) MPI_Comm_free(&nc4_info->comm);
if (info_duped) MPI_Info_free(&nc4_info->info);
#endif
#ifdef EXTRA_TESTS
num_plists--;
#endif
if (fapl_id != H5P_DEFAULT) H5Pclose(fapl_id);
if (!nc4_info) return retval;
close_netcdf4_file(nc4_info,1); /* treat like abort*/
#if 0
Expand Down Expand Up @@ -3115,6 +3162,15 @@ close_netcdf4_file(NC_HDF5_FILE_INFO_T *h5, int abort)
}
else
{
#ifdef USE_PARALLEL
/* Free the MPI Comm & Info objects, if we opened the file in parallel */
if(h5->parallel)
{
MPI_Comm_free(&h5->comm);
if(MPI_INFO_NULL != h5->info)
MPI_Info_free(&h5->info);
}
#endif
if (H5Fclose(h5->hdfid) < 0)
{
int nobjs;
Expand All @@ -3133,10 +3189,6 @@ close_netcdf4_file(NC_HDF5_FILE_INFO_T *h5, int abort)
retval = NC_EHDFERR; goto done;
}
}
#if 0
if (H5garbage_collect() < 0)
{retval = NC_EHDFERR; goto done;
#endif
}

done:
Expand Down
34 changes: 27 additions & 7 deletions libsrc4/nc4hdf.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
hid_t file_spaceid = 0, mem_spaceid = 0, xfer_plistid = 0;
size_t file_type_size;

hsize_t *xtend_size = NULL, count[NC_MAX_VAR_DIMS];
hsize_t xtend_size[NC_MAX_VAR_DIMS] , count[NC_MAX_VAR_DIMS];
hsize_t fdims[NC_MAX_VAR_DIMS], fmaxdims[NC_MAX_VAR_DIMS];
hsize_t start[NC_MAX_VAR_DIMS];
int need_to_extend = 0;
Expand Down Expand Up @@ -748,8 +748,6 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
it to that size. */
if (var->ndims)
{
if (!(xtend_size = malloc(var->ndims * sizeof(hsize_t))))
BAIL(NC_ENOMEM);
for (d2 = 0; d2 < var->ndims; d2++)
{
if ((retval = nc4_find_dim(grp, var->dimids[d2], &dim, NULL)))
Expand All @@ -776,14 +774,37 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
}
}

#ifdef USE_PARALLEL
/* Check if anyone wants to extend */
if (h5->parallel && NC_COLLECTIVE == var->parallel_access)
{
/* Form consensus opinion among all processes about whether to perform
* collective I/O
*/
if(MPI_SUCCESS != MPI_Allreduce(MPI_IN_PLACE, &need_to_extend, 1, MPI_INT, MPI_BOR, h5->comm))
BAIL(NC_EMPI);
}
#endif /* USE_PARALLEL */

/* If we need to extend it, we also need a new file_spaceid
to reflect the new size of the space. */
if (need_to_extend)
{
LOG((4, "extending dataset"));
#ifdef USE_PARALLEL
if (h5->parallel && NC_COLLECTIVE != var->parallel_access)
BAIL(NC_ECANTEXTEND);
if (h5->parallel)
{
if(NC_COLLECTIVE != var->parallel_access)
BAIL(NC_ECANTEXTEND);

/* Reach consensus about dimension sizes to extend to */
/* (Note: Somewhat hackish, with the use of MPI_BYTE, but MPI_MAX is
* correct with this usage, as long as it's not executed on
* heterogenous systems)
*/
if(MPI_SUCCESS != MPI_Allreduce(MPI_IN_PLACE, &xtend_size, (var->ndims * sizeof(hsize_t)), MPI_BYTE, MPI_MAX, h5->comm))
BAIL(NC_EMPI);
}
#endif /* USE_PARALLEL */
if (H5Dset_extent(var->hdf_datasetid, xtend_size) < 0)
BAIL(NC_EHDFERR);
Expand Down Expand Up @@ -850,7 +871,6 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
#ifndef HDF5_CONVERT
if (need_to_convert) free(bufr);
#endif
if (xtend_size) free(xtend_size);

/* If there was an error return it, otherwise return any potential
range error value. If none, return NC_NOERR as usual.*/
Expand Down Expand Up @@ -2415,7 +2435,7 @@ write_dim(NC_DIM_INFO_T *dim, NC_GRP_INFO_T *grp, int write_dimid)
}
}
}
if (H5Dextend(v1->hdf_datasetid, new_size) < 0)
if (H5Dset_extent(v1->hdf_datasetid, new_size) < 0)
BAIL(NC_EHDFERR);
free(new_size);
}
Expand Down
86 changes: 72 additions & 14 deletions nc_test4/tst_parallel3.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int main(int argc, char **argv)
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("\n*** Testing more advanced parallel access.\n");

for (i = 0; i < 16; i++){
Expand All @@ -101,56 +101,68 @@ int main(int argc, char **argv)
sprintf(file_name, "%s/%s", TEMP_LARGE, FILE_NAME);

/* Test NetCDF4 with MPI-IO driver */
if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for raw-data with MPI-IO (driver)...");
if(test_pio(NC_INDEPENDENT)!=0) ERR;
if(test_pio(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for meta-data with MPI-IO (driver)...");
if(test_pio_attr(NC_INDEPENDENT)!=0) ERR;
if(test_pio_attr(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for different hyperslab selections with MPI-IO (driver)...");
if(test_pio_hyper(NC_INDEPENDENT)!=0)ERR;
if(test_pio_hyper(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for extending variables with MPI-IO (driver)...");
if(test_pio_extend(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 0)
printf("*** Testing parallel IO for raw-data with MPIPOSIX-IO (driver)...");
facc_type = NC_NETCDF4|NC_MPIPOSIX;
facc_type_open = NC_MPIPOSIX;
if(test_pio(NC_INDEPENDENT)!=0) ERR;
if(test_pio(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for meta-data with MPIPOSIX-IO (driver)...");
if(test_pio_attr(NC_INDEPENDENT)!=0) ERR;
if(test_pio_attr(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for different hyperslab selections "
"with MPIPOSIX-IO (driver)...");
if(test_pio_hyper(NC_INDEPENDENT)!=0)ERR;
if(test_pio_hyper(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 0)
printf("*** Testing parallel IO for extending variables with MPIPOSIX-IO (driver)...");
if(test_pio_extend(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 0)
SUMMARIZE_ERR;

/* if(!getenv_all(MPI_COMM_WORLD,0,"NETCDF4_NOCLEANUP")) */
remove(file_name);
MPI_Finalize();

if (mpi_rank == 1)
if (mpi_rank == 0)
FINAL_RESULTS;
return 0;
}
Expand Down Expand Up @@ -686,6 +698,52 @@ int test_pio_hyper(int flag){
return 0;
}

/* test extending variables */
int test_pio_extend(int flag){
int rank, procs;
int ncFile;
int ncDimPart;
int ncDimVrtx;
int ncVarVrtx;
int dimsVrtx[2];
size_t start[2];
size_t count[2];
int vertices[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &procs);

/* Create netcdf file */
if (nc_create_par("test.nc", NC_NETCDF4 | NC_MPIIO, MPI_COMM_WORLD, MPI_INFO_NULL, &ncFile)) ERR;

/* Create netcdf dimensions */
if (nc_def_dim(ncFile, "partitions", procs, &ncDimPart)) ERR;
if (nc_def_dim(ncFile, "vertices", NC_UNLIMITED, &ncDimVrtx)) ERR;

/* Create netcdf variables */
dimsVrtx[0] = ncDimPart;
dimsVrtx[1] = ncDimVrtx;
if (nc_def_var(ncFile, "vertex", NC_INT, 2, dimsVrtx, &ncVarVrtx)) ERR;

/* Start writing data */
if (nc_enddef(ncFile)) ERR;

/* Set access mode */
if (nc_var_par_access(ncFile, ncVarVrtx, flag)) ERR;

/* Write vertices */
start[0] = rank;
start[1] = 0;
count[0] = 1;
count[1] = rank;
if (nc_put_vara_int(ncFile, ncVarVrtx, start, count, vertices)) ERR;

/* Close netcdf file */
if (nc_close(ncFile)) ERR;

return 0;
}

/*-------------------------------------------------------------------------
* Function: getenv_all
*
Expand Down