Skip to content

Commit

Permalink
UCP/RMA: Report the error if doing RMA for non-HOST memory
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Nov 18, 2019
1 parent d2cbbc2 commit cacdf20
Show file tree
Hide file tree
Showing 23 changed files with 368 additions and 122 deletions.
24 changes: 21 additions & 3 deletions src/tools/perf/lib/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,22 @@ static ucs_status_t ucx_perf_test_check_params(ucx_perf_params_t *params)
return UCS_ERR_INVALID_PARAM;
}

if ((params->api == UCX_PERF_API_UCP) &&
((params->mem_type == UCS_MEMORY_TYPE_CUDA) ||
(params->mem_type == UCS_MEMORY_TYPE_ROCM)) &&
((params->command == UCX_PERF_CMD_PUT) ||
(params->command == UCX_PERF_CMD_GET) ||
(params->command == UCX_PERF_CMD_ADD) ||
(params->command == UCX_PERF_CMD_FADD) ||
(params->command == UCX_PERF_CMD_SWAP) ||
(params->command == UCX_PERF_CMD_CSWAP))) {
if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
ucs_error("UCP doesn't support RMA/AMO for \"%s\" memory type",
ucs_memory_type_names[params->mem_type]);
}
return UCS_ERR_INVALID_PARAM;
}

if (params->max_outstanding < 1) {
if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
ucs_error("max_outstanding, need to be at least 1");
Expand Down Expand Up @@ -650,9 +666,11 @@ static ucs_status_t uct_perf_test_check_capabilities(ucx_perf_params_t *params,

if (!(md_attr.cap.access_mem_type == params->mem_type) &&
!(md_attr.cap.reg_mem_types & UCS_BIT(params->mem_type))) {
ucs_error("Unsupported memory type %s by %s/%s",
ucs_memory_type_names[params->mem_type],
params->uct.tl_name, params->uct.dev_name);
if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
ucs_error("Unsupported memory type %s by %s/%s",
ucs_memory_type_names[params->mem_type],
params->uct.tl_name, params->uct.dev_name);
}
return UCS_ERR_INVALID_PARAM;
}

Expand Down
31 changes: 19 additions & 12 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1423,13 +1423,15 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
rma_config = &config->rma[lane];
rma_config->put_zcopy_thresh = SIZE_MAX;
rma_config->get_zcopy_thresh = SIZE_MAX;
rma_config->max_put_short = SIZE_MAX;
rma_config->max_get_short = SIZE_MAX;
rma_config->max_put_bcopy = SIZE_MAX;
rma_config->max_get_bcopy = SIZE_MAX;
rma_config = &config->rma[lane];
rma_config->max_put_short.memtype_on = -1;
rma_config->max_put_short.memtype_off = -1;
rma_config->max_get_short.memtype_on = -1;
rma_config->max_get_short.memtype_off = -1;
rma_config->put_zcopy_thresh = SIZE_MAX;
rma_config->get_zcopy_thresh = SIZE_MAX;
rma_config->max_put_bcopy = SIZE_MAX;
rma_config->max_get_bcopy = SIZE_MAX;

if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
Expand All @@ -1456,8 +1458,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
rma_config->put_zcopy_thresh);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_SHORT) {
rma_config->max_put_short = ucs_min(iface_attr->cap.put.max_short,
rma_config->max_put_bcopy);
ucp_ep_config_set_memtype_thresh(&rma_config->max_put_short,
ucs_min(iface_attr->cap.put.max_short,
rma_config->max_put_bcopy),
context->num_mem_type_detect_mds);
}

/* GET */
Expand All @@ -1477,8 +1481,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
rma_config->get_zcopy_thresh);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_SHORT) {
rma_config->max_get_short = ucs_min(iface_attr->cap.get.max_short,
rma_config->max_get_bcopy);
ucp_ep_config_set_memtype_thresh(&rma_config->max_get_short,
ucs_min(iface_attr->cap.get.max_short,
rma_config->max_get_bcopy),
context->num_mem_type_detect_mds);
}
} else {
rma_config->max_put_bcopy = UCP_MIN_BCOPY; /* Stub endpoint */
Expand Down Expand Up @@ -1720,7 +1726,8 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker,
continue;
}
ucp_ep_config_print_rma_proto(stream, "put", lane,
ucs_max(config->rma[lane].max_put_short + 1,
/* TODO: UCP RMA supports only HOST memory type */
ucs_max(config->rma[lane].max_put_short.memtype_off + 1,
config->bcopy_thresh),
config->rma[lane].put_zcopy_thresh);
ucp_ep_config_print_rma_proto(stream, "get", lane, 0,
Expand Down
23 changes: 11 additions & 12 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ enum {
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCP_EP_STAT_TAG_TX_##_op, 1);


/*
* Thresholds with and without non-host memory
*/
typedef struct ucp_memtype_thresh {
ssize_t memtype_on;
ssize_t memtype_off;
} ucp_memtype_thresh_t;


/*
* Endpoint configuration key.
* This is filled by to the transport selection logic, according to the local
Expand Down Expand Up @@ -155,10 +164,10 @@ typedef struct ucp_ep_config_key {
* Configuration for RMA protocols
*/
typedef struct ucp_ep_rma_config {
size_t max_put_short; /* Maximal payload of put short */
ucp_memtype_thresh_t max_put_short; /* Maximal payload of put short */
size_t max_put_bcopy; /* Maximal total size of put_bcopy */
size_t max_put_zcopy;
size_t max_get_short; /* Maximal payload of get short */
ucp_memtype_thresh_t max_get_short; /* Maximal payload of get short */
size_t max_get_bcopy; /* Maximal total size of get_bcopy */
size_t max_get_zcopy;
size_t put_zcopy_thresh;
Expand Down Expand Up @@ -187,15 +196,6 @@ typedef struct ucp_ep_msg_config {
} ucp_ep_msg_config_t;


/*
* Thresholds with and without non-host memory
*/
typedef struct ucp_memtype_thresh {
ssize_t memtype_on;
ssize_t memtype_off;
} ucp_memtype_thresh_t;


typedef struct ucp_ep_config {

/* A key which uniquely defines the configuration, and all other fields of
Expand Down Expand Up @@ -285,7 +285,6 @@ typedef struct ucp_ep_config {
const ucp_proto_t *proto;
const ucp_proto_t *reply_proto;
} am_u;

} ucp_ep_config_t;


Expand Down
9 changes: 9 additions & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,13 @@ static inline ucp_lane_index_t ucp_ep_get_cm_lane(ucp_ep_h ep)
return ucp_ep_config(ep)->key.cm_lane;
}

static UCS_F_ALWAYS_INLINE int
ucp_ep_send_is_inline(ucp_ep_h ep, const ucp_memtype_thresh_t *max_short,
ssize_t length)
{
return (ucs_likely(length <= max_short->memtype_off) ||
(length <= max_short->memtype_on &&
ucp_memory_type_cache_is_empty(ep->worker->context)));
}

#endif
24 changes: 13 additions & 11 deletions src/ucp/core/ucp_mm.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ typedef struct ucp_tl_rkey {
typedef struct ucp_rkey {
/* cached values for the most recent endpoint configuration */
struct {
ucp_ep_cfg_index_t ep_cfg_index; /* EP configuration relevant for the cache */
ucp_lane_index_t rma_lane; /* Lane to use for RMAs */
ucp_lane_index_t amo_lane; /* Lane to use for AMOs */
unsigned max_put_short;/* Cached value of max_put_short */
uct_rkey_t rma_rkey; /* Key to use for RMAs */
uct_rkey_t amo_rkey; /* Key to use for AMOs */
ucp_amo_proto_t *amo_proto; /* Protocol for AMOs */
ucp_rma_proto_t *rma_proto; /* Protocol for RMAs */
ucp_ep_cfg_index_t ep_cfg_index; /* EP configuration relevant for the cache */
ucp_lane_index_t rma_lane; /* Lane to use for RMAs */
ucp_lane_index_t amo_lane; /* Lane to use for AMOs */
ucp_memtype_thresh_t max_put_short; /* Cached value of max_put_short */
uct_rkey_t rma_rkey; /* Key to use for RMAs */
uct_rkey_t amo_rkey; /* Key to use for AMOs */
ucp_amo_proto_t *amo_proto; /* Protocol for AMOs */
ucp_rma_proto_t *rma_proto; /* Protocol for RMAs */
} cache;
ucp_md_map_t md_map; /* Which *remote* MDs have valid memory handles */
ucs_memory_type_t mem_type; /* Memory type of remote key memory */
ucp_md_map_t md_map; /* Which *remote* MDs have valid memory handles */
ucs_memory_type_t mem_type; /* Memory type of remote key memory */
#if ENABLE_PARAMS_CHECK
ucp_ep_h ep;
#endif
ucp_tl_rkey_t tl_rkey[0]; /* UCT rkey for every remote MD */
ucp_tl_rkey_t tl_rkey[0]; /* UCT rkey for every remote MD */
} ucp_rkey_t;


Expand Down Expand Up @@ -210,5 +210,7 @@ ucp_memh2uct(ucp_mem_h memh, ucp_md_index_t md_idx)
#define UCP_MEM_IS_CUDA(_mem_type) ((_mem_type) == UCS_MEMORY_TYPE_CUDA)
#define UCP_MEM_IS_CUDA_MANAGED(_mem_type) ((_mem_type) == UCS_MEMORY_TYPE_CUDA_MANAGED)
#define UCP_MEM_IS_ROCM_MANAGED(_mem_type) ((_mem_type) == UCS_MEMORY_TYPE_ROCM_MANAGED)
#define UCP_MEM_IS_ACCESSIBLE_FROM_CPU(_mem_type) \
(UCS_BIT(_mem_type) & UCS_MEMORY_TYPES_CPU_ACCESSIBLE)

#endif
9 changes: 6 additions & 3 deletions src/ucp/core/ucp_rkey.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include <inttypes.h>


const ucp_memtype_thresh_t ucp_rma_sw_max_put_short = {-1, -1};


static struct {
ucp_md_map_t md_map;
uint8_t mem_type;
Expand Down Expand Up @@ -429,14 +432,14 @@ void ucp_rkey_resolve_inner(ucp_rkey_h rkey, ucp_ep_h ep)
int rma_sw, amo_sw;

rkey->cache.rma_lane = ucp_config_find_rma_lane(context, config,
UCS_MEMORY_TYPE_HOST,
rkey->mem_type,
config->key.rma_lanes, rkey,
0, &uct_rkey);
rma_sw = (rkey->cache.rma_lane == UCP_NULL_LANE);
if (rma_sw) {
rkey->cache.rma_proto = &ucp_rma_sw_proto;
rkey->cache.rma_rkey = UCT_INVALID_RKEY;
rkey->cache.max_put_short = 0;
rkey->cache.max_put_short = ucp_rma_sw_max_put_short;
} else {
rkey->cache.rma_proto = &ucp_rma_basic_proto;
rkey->cache.rma_rkey = uct_rkey;
Expand All @@ -445,7 +448,7 @@ void ucp_rkey_resolve_inner(ucp_rkey_h rkey, ucp_ep_h ep)
}

rkey->cache.amo_lane = ucp_config_find_rma_lane(context, config,
UCS_MEMORY_TYPE_HOST,
rkey->mem_type,
config->key.amo_lanes, rkey,
0, &uct_rkey);
amo_sw = (rkey->cache.amo_lane == UCP_NULL_LANE);
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/rma/rma_basic.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ static ucs_status_t ucp_rma_basic_progress_put(uct_pending_req_t *self)
ucs_assert(rkey->cache.ep_cfg_index == ep->cfg_index);
ucs_assert(rkey->cache.rma_lane == lane);

if ((req->send.length <= rma_config->max_put_short) ||
if (ucp_ep_send_is_inline(ep, &rkey->cache.max_put_short, req->send.length) &&
(req->send.length <= ucp_ep_config(ep)->bcopy_thresh))
{
packed_len = ucs_min(req->send.length, rma_config->max_put_short);
packed_len = req->send.length;
status = UCS_PROFILE_CALL(uct_ep_put_short,
ep->uct_eps[lane],
req->send.buffer,
Expand Down
34 changes: 29 additions & 5 deletions src/ucp/rma/rma_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,17 @@ ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer,
req->send.ep = ep;
req->send.buffer = (void*)buffer;
req->send.datatype = ucp_dt_make_contig(1);
req->send.mem_type = UCS_MEMORY_TYPE_HOST;
req->send.mem_type = ucp_memory_type_detect(ep->worker->context,
buffer, length);
if (ucs_unlikely(!UCP_MEM_IS_ACCESSIBLE_FROM_CPU(req->send.mem_type) ||
!UCP_MEM_IS_ACCESSIBLE_FROM_CPU(rkey->mem_type))) {
/* TODO: remove when support for CUDA/RoCM memory types will be added */
ucs_error("UCP doesn't support RMA for \"%s\"<->\"%s\" memory types",
ucs_memory_type_names[req->send.mem_type],
ucs_memory_type_names[rkey->mem_type]);
return UCS_ERR_INVALID_PARAM;
}

req->send.length = length;
req->send.rma.remote_addr = remote_addr;
req->send.rma.rkey = rkey;
Expand Down Expand Up @@ -166,10 +176,16 @@ ucp_rma_nonblocking(ucp_ep_h ep, const void *buffer, size_t length,
progress_cb, zcopy_thresh,
UCP_REQUEST_FLAG_RELEASED);
if (ucs_unlikely(status != UCS_OK)) {
ucp_request_put(req);
return status;
}

return ucp_request_send(req, 0);
status = ucp_request_send(req, 0);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
ucp_request_put(req);
}

return status;
}

static UCS_F_ALWAYS_INLINE ucs_status_ptr_t
Expand All @@ -178,6 +194,7 @@ ucp_rma_nonblocking_cb(ucp_ep_h ep, const void *buffer, size_t length,
uct_pending_callback_t progress_cb, size_t zcopy_thresh,
ucp_send_callback_t cb)
{
ucs_status_ptr_t status_ptr;
ucs_status_t status;
ucp_request_t *req;

Expand All @@ -189,10 +206,17 @@ ucp_rma_nonblocking_cb(ucp_ep_h ep, const void *buffer, size_t length,
status = ucp_rma_request_init(req, ep, buffer, length, remote_addr, rkey,
progress_cb, zcopy_thresh, 0);
if (ucs_unlikely(status != UCS_OK)) {
ucp_request_put(req);
return UCS_STATUS_PTR(status);
}

return ucp_rma_send_request_cb(req, cb);
status_ptr = ucp_rma_send_request_cb(req, cb);
if (ucs_unlikely(!UCS_PTR_IS_PTR(status_ptr) &&
UCS_PTR_IS_ERR(status_ptr))) {
ucp_request_put(req);
}

return status_ptr;
}

ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
Expand All @@ -214,7 +238,7 @@ ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
}

/* Fast path for a single short message */
if (ucs_likely((ssize_t)length <= (int)rkey->cache.max_put_short)) {
if (ucp_ep_send_is_inline(ep, &rkey->cache.max_put_short, length)) {
status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
buffer, length, remote_addr, rkey->cache.rma_rkey);
if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
Expand Down Expand Up @@ -252,7 +276,7 @@ ucs_status_ptr_t ucp_put_nb(ucp_ep_h ep, const void *buffer, size_t length,
}

/* Fast path for a single short message */
if (ucs_likely((ssize_t)length <= (int)rkey->cache.max_put_short)) {
if (ucp_ep_send_is_inline(ep, &rkey->cache.max_put_short, length)) {
status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
buffer, length, remote_addr, rkey->cache.rma_rkey);
if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
Expand Down
17 changes: 4 additions & 13 deletions src/ucp/tag/tag_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,6 @@ ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, const void* buffer,
req->send.pending_lane = UCP_NULL_LANE;
}

static UCS_F_ALWAYS_INLINE int
ucp_tag_eager_is_inline(ucp_ep_h ep, const ucp_memtype_thresh_t *max_eager_short,
ssize_t length)
{
return (ucs_likely(length <= max_eager_short->memtype_off) ||
(length <= max_eager_short->memtype_on &&
ucp_memory_type_cache_is_empty(ep->worker->context)));
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_tag_send_inline(ucp_ep_h ep, const void *buffer, size_t count,
uintptr_t datatype, ucp_tag_t tag)
Expand All @@ -166,14 +157,14 @@ ucp_tag_send_inline(ucp_ep_h ep, const void *buffer, size_t count,

length = ucp_contig_dt_length(datatype, count);

if (ucp_tag_eager_is_inline(ep, &ucp_ep_config(ep)->tag.max_eager_short,
length)) {
if (ucp_ep_send_is_inline(ep, &ucp_ep_config(ep)->tag.max_eager_short,
length)) {
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(ucp_eager_hdr_t));
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uint64_t));
status = uct_ep_am_short(ucp_ep_get_am_uct_ep(ep), UCP_AM_ID_EAGER_ONLY,
tag, buffer, length);
} else if (ucp_tag_eager_is_inline(ep, &ucp_ep_config(ep)->tag.offload.max_eager_short,
length)) {
} else if (ucp_ep_send_is_inline(ep, &ucp_ep_config(ep)->tag.offload.max_eager_short,
length)) {
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uct_tag_t));
status = uct_ep_tag_eager_short(ucp_ep_get_tag_uct_ep(ep), tag, buffer,
length);
Expand Down
8 changes: 8 additions & 0 deletions src/ucs/memory/memory_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

BEGIN_C_DECLS


/* Memory types accessible from CPU */
#define UCS_MEMORY_TYPES_CPU_ACCESSIBLE \
(UCS_BIT(UCS_MEMORY_TYPE_HOST) | \
UCS_BIT(UCS_MEMORY_TYPE_CUDA_MANAGED) | \
UCS_BIT(UCS_MEMORY_TYPE_ROCM_MANAGED))


/*
* Memory types
*/
Expand Down
2 changes: 1 addition & 1 deletion src/uct/ib/base/ib_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ static ucs_status_t uct_ib_md_query(uct_md_h uct_md, uct_md_attr_t *md_attr)
UCT_MD_FLAG_NEED_MEMH |
UCT_MD_FLAG_NEED_RKEY |
UCT_MD_FLAG_ADVISE;
md_attr->cap.reg_mem_types = UCS_BIT(UCS_MEMORY_TYPE_HOST);
md_attr->cap.reg_mem_types = UCS_MEMORY_TYPES_CPU_ACCESSIBLE;
md_attr->cap.access_mem_type = UCS_MEMORY_TYPE_HOST;
md_attr->cap.detect_mem_types = 0;

Expand Down
Loading

0 comments on commit cacdf20

Please sign in to comment.