Skip to content

Commit

Permalink
OFI: Add missing thread-completion mutexes
Browse files Browse the repository at this point in the history
Signed-off-by: James Dinan <james.dinan@intel.com>
  • Loading branch information
James Dinan committed Feb 1, 2018
1 parent 6ac5116 commit 72e06fd
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void
shmem_internal_assert(len <= sizeof(double _Complex));
shmem_internal_assert(SHMEM_Dtsize[datatype] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr);

do {
Expand All @@ -802,6 +803,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void
FI_CSWAP,
NULL);
} while (try_again(ctx, ret, &polled));
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


Expand All @@ -820,6 +822,7 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void
shmem_internal_assert(len <= sizeof(double _Complex));
shmem_internal_assert(SHMEM_Dtsize[datatype] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr);

do {
Expand All @@ -838,6 +841,7 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void
FI_MSWAP,
NULL);
} while (try_again(ctx, ret, &polled));
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


Expand All @@ -855,6 +859,7 @@ void shmem_transport_atomic_small(shmem_transport_ctx_t* ctx, void *target, cons

shmem_internal_assert(SHMEM_Dtsize[datatype] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

do {
Expand All @@ -867,6 +872,7 @@ void shmem_transport_atomic_small(shmem_transport_ctx_t* ctx, void *target, cons
datatype,
op);
} while (try_again(ctx, ret, &polled));
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


Expand All @@ -884,6 +890,7 @@ void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const

shmem_internal_assert(SHMEM_Dtsize[datatype] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);
do {
ret = fi_inject_atomic(ctx->cntr_ep,
Expand All @@ -895,6 +902,7 @@ void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const
datatype,
FI_ATOMIC_WRITE);
} while (try_again(ctx, ret, &polled));
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


Expand All @@ -912,6 +920,7 @@ void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, cons

shmem_internal_assert(SHMEM_Dtsize[datatype] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr);

do {
Expand All @@ -928,6 +937,7 @@ void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, cons
FI_ATOMIC_READ,
NULL);
} while (try_again(ctx, ret, &polled));
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


Expand All @@ -946,11 +956,13 @@ void shmem_transport_atomic_nb(shmem_transport_ctx_t* ctx, void *target, const v

shmem_internal_assert(SHMEM_Dtsize[datatype] * len == full_len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
ret = fi_atomicvalid(ctx->cntr_ep, datatype, op,
&max_atomic_size);
max_atomic_size = max_atomic_size * SHMEM_Dtsize[datatype];
if (max_atomic_size > shmem_transport_ofi_max_msg_size
|| ret || max_atomic_size == 0) {
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
RAISE_ERROR_MSG("Atomic operation with datatype %d and op %d not supported\n",
datatype, op);
}
Expand Down Expand Up @@ -1024,6 +1036,7 @@ void shmem_transport_atomic_nb(shmem_transport_ctx_t* ctx, void *target, const v
sent += chunksize;
}
}
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


Expand All @@ -1042,6 +1055,7 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, cons
shmem_internal_assert(len <= sizeof(double _Complex));
shmem_internal_assert(SHMEM_Dtsize[datatype] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr);

do {
Expand All @@ -1058,18 +1072,28 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, cons
op,
NULL);
} while (try_again(ctx, ret, &polled));
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}


/* detect atomic limitation on the fly and provide software reduction support
if needed */
/* Query transport layer to detemine if the given combination of <op, datatype>
* is supported as a one-sided atomic operation. This is used by reductions to
* check whether to fall back to software reductions for things like double
* complex product reductions. */
static inline
int shmem_transport_atomic_supported(shm_internal_op_t op,
shm_internal_datatype_t datatype)
{
size_t size = 0;

/* NOTE-MT: It's not clear from the OFI documentation whether this mutex is
* actually required by FI_THREAD_COMPLETION. */

SHMEM_TRANSPORT_OFI_CTX_LOCK(&shmem_transport_ctx_default);
int ret = fi_atomicvalid(shmem_transport_ctx_default.cntr_ep,
datatype, op, &size);
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(&shmem_transport_ctx_default);

return !(ret != 0 || size == 0);
}

Expand Down Expand Up @@ -1125,6 +1149,8 @@ uint64_t shmem_transport_received_cntr_get(void)
{
#ifndef ENABLE_HARD_POLLING
shmem_internal_assert(shmem_internal_thread_level == SHMEM_THREAD_SINGLE);
/* NOTE-MT: This is only reachable in single-threaded runs, otherwise
* we would need a mutex to support FI_THREAD_COMPLETION builds. */
return fi_cntr_read(shmem_transport_ofi_target_cntrfd);
#else
RAISE_ERROR_STR("OFI transport configured for hard polling");
Expand All @@ -1137,6 +1163,8 @@ void shmem_transport_received_cntr_wait(uint64_t ge_val)
{
#ifndef ENABLE_HARD_POLLING
shmem_internal_assert(shmem_internal_thread_level == SHMEM_THREAD_SINGLE);
/* NOTE-MT: This is only reachable in single-threaded runs, otherwise
* we would need a mutex to support FI_THREAD_COMPLETION builds. */
int ret = fi_cntr_wait(shmem_transport_ofi_target_cntrfd, ge_val, -1);

OFI_CHECK_ERROR(ret);
Expand Down

0 comments on commit 72e06fd

Please sign in to comment.