Skip to content

Commit

Permalink
UCP/RMA: Report error if doing RMA for CUDA/RoCM memory buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Nov 13, 2019
1 parent 21072b9 commit a8e284a
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 60 deletions.
45 changes: 43 additions & 2 deletions src/tools/perf/cuda/cuda_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,64 @@ static inline ucs_status_t ucx_perf_cuda_alloc(size_t length,
return UCS_OK;
}

static ucs_status_t
ucp_perf_cuda_alloc_mapped(const ucx_perf_context_t *perf,
ucs_memory_type_t mem_type, size_t length,
void **address_p, ucp_mem_h *memh_p,
int non_blk_flag)
{
ucp_mem_map_params_t mem_map_params;
ucs_status_t status;

status = ucx_perf_cuda_alloc(length, mem_type, address_p);
if (status != UCS_OK) {
return status;
}

mem_map_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
UCP_MEM_MAP_PARAM_FIELD_LENGTH;
mem_map_params.address = *address_p;
mem_map_params.length = length;
if (perf->params.flags & UCX_PERF_TEST_FLAG_MAP_NONBLOCK) {
mem_map_params.flags = non_blk_flag;
mem_map_params.field_mask |= UCP_MEM_MAP_PARAM_FIELD_FLAGS;
}

status = ucp_mem_map(perf->ucp.context, &mem_map_params, memh_p);
if (status != UCS_OK) {
goto err;
}

return UCS_OK;

err:
cudaFree(*address_p);
return status;
}

static ucs_status_t ucp_perf_cuda_alloc(const ucx_perf_context_t *perf, size_t length,
void **address_p, ucp_mem_h *memh_p,
int non_blk_flag)
{
return ucx_perf_cuda_alloc(length, UCS_MEMORY_TYPE_CUDA, address_p);
return ucp_perf_cuda_alloc_mapped(perf, UCS_MEMORY_TYPE_CUDA,
length, address_p, memh_p, non_blk_flag);
}

static ucs_status_t ucp_perf_cuda_alloc_managed(const ucx_perf_context_t *perf,
size_t length, void **address_p,
ucp_mem_h *memh_p, int non_blk_flag)
{
return ucx_perf_cuda_alloc(length, UCS_MEMORY_TYPE_CUDA_MANAGED, address_p);
return ucp_perf_cuda_alloc_mapped(perf, UCS_MEMORY_TYPE_CUDA_MANAGED,
length, address_p, memh_p, non_blk_flag);
}

static void ucp_perf_cuda_free(const ucx_perf_context_t *perf,
void *address, ucp_mem_h memh)
{
ucs_status_t status = ucp_mem_unmap(perf->ucp.context, memh);
if (status != UCS_OK) {
ucs_warn("ucp_mem_unmap() failed: %s", ucs_status_string(status));
}
cudaFree(address);
}

Expand Down
24 changes: 21 additions & 3 deletions src/tools/perf/lib/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,22 @@ static ucs_status_t ucx_perf_test_check_params(ucx_perf_params_t *params)
return UCS_ERR_INVALID_PARAM;
}

if ((params->api == UCX_PERF_API_UCP) &&
((params->mem_type == UCS_MEMORY_TYPE_CUDA) ||
(params->mem_type == UCS_MEMORY_TYPE_ROCM)) &&
((params->command == UCX_PERF_CMD_PUT) ||
(params->command == UCX_PERF_CMD_GET) ||
(params->command == UCX_PERF_CMD_ADD) ||
(params->command == UCX_PERF_CMD_FADD) ||
(params->command == UCX_PERF_CMD_SWAP) ||
(params->command == UCX_PERF_CMD_CSWAP))) {
if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
ucs_error("UCP doesn't support RMA/AMO for \"%s\" memory type",
ucs_memory_type_names[params->mem_type]);
}
return UCS_ERR_INVALID_PARAM;
}

if (params->max_outstanding < 1) {
if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
ucs_error("max_outstanding, need to be at least 1");
Expand Down Expand Up @@ -650,9 +666,11 @@ static ucs_status_t uct_perf_test_check_capabilities(ucx_perf_params_t *params,

if (!(md_attr.cap.access_mem_type == params->mem_type) &&
!(md_attr.cap.reg_mem_types & UCS_BIT(params->mem_type))) {
ucs_error("Unsupported memory type %s by %s/%s",
ucs_memory_type_names[params->mem_type],
params->uct.tl_name, params->uct.dev_name);
if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
ucs_error("Unsupported memory type %s by %s/%s",
ucs_memory_type_names[params->mem_type],
params->uct.tl_name, params->uct.dev_name);
}
return UCS_ERR_INVALID_PARAM;
}

Expand Down
4 changes: 3 additions & 1 deletion src/tools/perf/lib/ucp_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ class ucp_perf_test_runner {
send_started();
return UCS_OK;
case UCX_PERF_CMD_PUT:
*((uint8_t*)buffer + length - 1) = sn;
m_perf.allocator->memcpy((psn_t*)buffer + length - 1,
m_perf.allocator->mem_type, &sn,
UCS_MEMORY_TYPE_HOST, sizeof(sn));
return ucp_put(ep, buffer, length, remote_addr, rkey);
case UCX_PERF_CMD_GET:
return ucp_get(ep, buffer, length, remote_addr, rkey);
Expand Down
31 changes: 19 additions & 12 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1423,13 +1423,15 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
rma_config = &config->rma[lane];
rma_config->put_zcopy_thresh = SIZE_MAX;
rma_config->get_zcopy_thresh = SIZE_MAX;
rma_config->max_put_short = SIZE_MAX;
rma_config->max_get_short = SIZE_MAX;
rma_config->max_put_bcopy = SIZE_MAX;
rma_config->max_get_bcopy = SIZE_MAX;
rma_config = &config->rma[lane];
rma_config->max_put_short.memtype_on = -1;
rma_config->max_put_short.memtype_off = -1;
rma_config->max_get_short.memtype_on = -1;
rma_config->max_get_short.memtype_off = -1;
rma_config->put_zcopy_thresh = SIZE_MAX;
rma_config->get_zcopy_thresh = SIZE_MAX;
rma_config->max_put_bcopy = SIZE_MAX;
rma_config->max_get_bcopy = SIZE_MAX;

if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
Expand All @@ -1456,8 +1458,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
rma_config->put_zcopy_thresh);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_SHORT) {
rma_config->max_put_short = ucs_min(iface_attr->cap.put.max_short,
rma_config->max_put_bcopy);
ucp_ep_config_set_memtype_thresh(&rma_config->max_put_short,
ucs_min(iface_attr->cap.put.max_short,
rma_config->max_put_bcopy),
context->num_mem_type_detect_mds);
}

/* GET */
Expand All @@ -1477,8 +1481,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
rma_config->get_zcopy_thresh);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_SHORT) {
rma_config->max_get_short = ucs_min(iface_attr->cap.get.max_short,
rma_config->max_get_bcopy);
ucp_ep_config_set_memtype_thresh(&rma_config->max_get_short,
ucs_min(iface_attr->cap.get.max_short,
rma_config->max_get_bcopy),
context->num_mem_type_detect_mds);
}
} else {
rma_config->max_put_bcopy = UCP_MIN_BCOPY; /* Stub endpoint */
Expand Down Expand Up @@ -1720,7 +1726,8 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker,
continue;
}
ucp_ep_config_print_rma_proto(stream, "put", lane,
ucs_max(config->rma[lane].max_put_short + 1,
/* TODO: UCP RMA supports only HOST memory type */
ucs_max(config->rma[lane].max_put_short.memtype_off + 1,
config->bcopy_thresh),
config->rma[lane].put_zcopy_thresh);
ucp_ep_config_print_rma_proto(stream, "get", lane, 0,
Expand Down
22 changes: 11 additions & 11 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ enum {
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCP_EP_STAT_TAG_TX_##_op, 1);


/*
* Thresholds with and without non-host memory
*/
typedef struct ucp_memtype_thresh {
ssize_t memtype_on;
ssize_t memtype_off;
} ucp_memtype_thresh_t;


/*
* Endpoint configuration key.
* This is filled by to the transport selection logic, according to the local
Expand Down Expand Up @@ -155,10 +164,10 @@ typedef struct ucp_ep_config_key {
* Configuration for RMA protocols
*/
typedef struct ucp_ep_rma_config {
size_t max_put_short; /* Maximal payload of put short */
ucp_memtype_thresh_t max_put_short; /* Maximal payload of put short */
size_t max_put_bcopy; /* Maximal total size of put_bcopy */
size_t max_put_zcopy;
size_t max_get_short; /* Maximal payload of get short */
ucp_memtype_thresh_t max_get_short; /* Maximal payload of get short */
size_t max_get_bcopy; /* Maximal total size of get_bcopy */
size_t max_get_zcopy;
size_t put_zcopy_thresh;
Expand Down Expand Up @@ -187,15 +196,6 @@ typedef struct ucp_ep_msg_config {
} ucp_ep_msg_config_t;


/*
* Thresholds with and without non-host memory
*/
typedef struct ucp_memtype_thresh {
ssize_t memtype_on;
ssize_t memtype_off;
} ucp_memtype_thresh_t;


typedef struct ucp_ep_config {

/* A key which uniquely defines the configuration, and all other fields of
Expand Down
21 changes: 21 additions & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,25 @@ static inline ucp_lane_index_t ucp_ep_get_cm_lane(ucp_ep_h ep)
return ucp_ep_config(ep)->key.cm_lane;
}

static UCS_F_ALWAYS_INLINE ssize_t
ucp_ep_get_max_inline(ucp_ep_h ep, const ucp_memtype_thresh_t *max_short)
{
if (ucs_likely(max_short->memtype_off > 0)) {
return max_short->memtype_off;
} else if (ucp_memory_type_cache_is_empty(ep->worker->context)) {
return max_short->memtype_on;
}

return -1;
}

static UCS_F_ALWAYS_INLINE int
ucp_ep_send_is_inline(ucp_ep_h ep, const ucp_memtype_thresh_t *max_short,
ssize_t length)
{
return (ucs_likely(length <= max_short->memtype_off) ||
(length <= max_short->memtype_on &&
ucp_memory_type_cache_is_empty(ep->worker->context)));
}

#endif
22 changes: 11 additions & 11 deletions src/ucp/core/ucp_mm.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ typedef struct ucp_tl_rkey {
typedef struct ucp_rkey {
/* cached values for the most recent endpoint configuration */
struct {
ucp_ep_cfg_index_t ep_cfg_index; /* EP configuration relevant for the cache */
ucp_lane_index_t rma_lane; /* Lane to use for RMAs */
ucp_lane_index_t amo_lane; /* Lane to use for AMOs */
unsigned max_put_short;/* Cached value of max_put_short */
uct_rkey_t rma_rkey; /* Key to use for RMAs */
uct_rkey_t amo_rkey; /* Key to use for AMOs */
ucp_amo_proto_t *amo_proto; /* Protocol for AMOs */
ucp_rma_proto_t *rma_proto; /* Protocol for RMAs */
ucp_ep_cfg_index_t ep_cfg_index; /* EP configuration relevant for the cache */
ucp_lane_index_t rma_lane; /* Lane to use for RMAs */
ucp_lane_index_t amo_lane; /* Lane to use for AMOs */
ssize_t max_put_short; /* Cached value of max_put_short */
uct_rkey_t rma_rkey; /* Key to use for RMAs */
uct_rkey_t amo_rkey; /* Key to use for AMOs */
ucp_amo_proto_t *amo_proto; /* Protocol for AMOs */
ucp_rma_proto_t *rma_proto; /* Protocol for RMAs */
} cache;
ucp_md_map_t md_map; /* Which *remote* MDs have valid memory handles */
ucs_memory_type_t mem_type; /* Memory type of remote key memory */
ucp_md_map_t md_map; /* Which *remote* MDs have valid memory handles */
ucs_memory_type_t mem_type; /* Memory type of remote key memory */
#if ENABLE_PARAMS_CHECK
ucp_ep_h ep;
#endif
ucp_tl_rkey_t tl_rkey[0]; /* UCT rkey for every remote MD */
ucp_tl_rkey_t tl_rkey[0]; /* UCT rkey for every remote MD */
} ucp_rkey_t;


Expand Down
5 changes: 3 additions & 2 deletions src/ucp/core/ucp_rkey.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,13 @@ void ucp_rkey_resolve_inner(ucp_rkey_h rkey, ucp_ep_h ep)
if (rma_sw) {
rkey->cache.rma_proto = &ucp_rma_sw_proto;
rkey->cache.rma_rkey = UCT_INVALID_RKEY;
rkey->cache.max_put_short = 0;
rkey->cache.max_put_short = -1;
} else {
rkey->cache.rma_proto = &ucp_rma_basic_proto;
rkey->cache.rma_rkey = uct_rkey;
rkey->cache.rma_proto = &ucp_rma_basic_proto;
rkey->cache.max_put_short = config->rma[rkey->cache.rma_lane].max_put_short;
rkey->cache.max_put_short =
ucp_ep_get_max_inline(ep, &config->rma[rkey->cache.rma_lane].max_put_short);
}

rkey->cache.amo_lane = ucp_config_find_rma_lane(context, config,
Expand Down
8 changes: 6 additions & 2 deletions src/ucp/rma/rma_basic.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ static ucs_status_t ucp_rma_basic_progress_put(uct_pending_req_t *self)
ucp_rkey_h rkey = req->send.rma.rkey;
ucp_lane_index_t lane = req->send.lane;
ucp_ep_rma_config_t *rma_config = &ucp_ep_config(ep)->rma[lane];
ssize_t max_put_short = ucp_ep_get_max_inline(ep,
&rma_config->max_put_short);
ucs_status_t status;
ssize_t packed_len;

ucs_assert(rkey->cache.ep_cfg_index == ep->cfg_index);
ucs_assert(rkey->cache.rma_lane == lane);

if ((req->send.length <= rma_config->max_put_short) ||
if ((max_put_short > 0) &&
(req->send.length <= (size_t)max_put_short) &&
(req->send.length <= ucp_ep_config(ep)->bcopy_thresh))
{
packed_len = ucs_min(req->send.length, rma_config->max_put_short);
ucs_assert(max_put_short > 0);
packed_len = ucs_min(req->send.length, (size_t)max_put_short);
status = UCS_PROFILE_CALL(uct_ep_put_short,
ep->uct_eps[lane],
req->send.buffer,
Expand Down
21 changes: 18 additions & 3 deletions src/ucp/rma/rma_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,16 @@ ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer,
req->send.ep = ep;
req->send.buffer = (void*)buffer;
req->send.datatype = ucp_dt_make_contig(1);
req->send.mem_type = UCS_MEMORY_TYPE_HOST;
req->send.mem_type = ucp_memory_type_detect(ep->worker->context,
buffer, length);
if (ucs_unlikely((req->send.mem_type == UCS_MEMORY_TYPE_CUDA) ||
(req->send.mem_type == UCS_MEMORY_TYPE_ROCM))) {
/* TODO: remove when support for CUDA/RoCM memory types will be added */
ucs_error("UCP doesn't support RMA for \"%s\" memory type",
ucs_memory_type_names[req->send.mem_type]);
return UCS_ERR_INVALID_PARAM;
}

req->send.length = length;
req->send.rma.remote_addr = remote_addr;
req->send.rma.rkey = rkey;
Expand Down Expand Up @@ -195,6 +204,12 @@ ucp_rma_nonblocking_cb(ucp_ep_h ep, const void *buffer, size_t length,
return ucp_rma_send_request_cb(req, cb);
}

int ucp_rma_put_is_inline(size_t length, ucp_rkey_h rkey)
{
return (ucs_likely((rkey->cache.max_put_short > 0) &&
(length <= (size_t)rkey->cache.max_put_short)));
}

ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
uint64_t remote_addr, ucp_rkey_h rkey)
{
Expand All @@ -214,7 +229,7 @@ ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
}

/* Fast path for a single short message */
if (ucs_likely((ssize_t)length <= (int)rkey->cache.max_put_short)) {
if (ucp_rma_put_is_inline(length, rkey)) {
status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
buffer, length, remote_addr, rkey->cache.rma_rkey);
if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
Expand Down Expand Up @@ -252,7 +267,7 @@ ucs_status_ptr_t ucp_put_nb(ucp_ep_h ep, const void *buffer, size_t length,
}

/* Fast path for a single short message */
if (ucs_likely((ssize_t)length <= (int)rkey->cache.max_put_short)) {
if (ucp_rma_put_is_inline(length, rkey)) {
status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
buffer, length, remote_addr, rkey->cache.rma_rkey);
if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
Expand Down
Loading

0 comments on commit a8e284a

Please sign in to comment.