From 869bf03ae99780feee80e41b549e5a468814429e Mon Sep 17 00:00:00 2001 From: Devendar Bureddy Date: Wed, 2 Sep 2020 10:13:13 +0300 Subject: [PATCH 1/2] UCX/RNDV/CUDA: RNDV protocol improvements for CUDA --- src/ucp/core/ucp_context.c | 4 + src/ucp/core/ucp_context.h | 2 + src/ucp/core/ucp_request.h | 15 ++- src/ucp/tag/offload.c | 3 +- src/ucp/tag/rndv.c | 264 ++++++++++++++++++++++++------------- src/ucp/tag/rndv.h | 12 +- 6 files changed, 194 insertions(+), 106 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 4cdf05fca83..0e46ff99ffa 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -256,6 +256,10 @@ static ucs_config_field_t ucp_config_table[] = { "RNDV fragment size \n", ucs_offsetof(ucp_config_t, ctx.rndv_frag_size), UCS_CONFIG_TYPE_MEMUNITS}, + {"RNDV_PIPELINE_SEND_THRESH", "inf", + "RNDV size threshold to enable sender side pipeline for mem type\n", + ucs_offsetof(ucp_config_t, ctx.rndv_pipeline_send_thresh), UCS_CONFIG_TYPE_MEMUNITS}, + {"MEMTYPE_CACHE", "y", "Enable memory type (cuda/rocm) cache \n", ucs_offsetof(ucp_config_t, ctx.enable_memtype_cache), UCS_CONFIG_TYPE_BOOL}, diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index d36f37c7012..e56f7902048 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -61,6 +61,8 @@ typedef struct ucp_context_config { size_t seg_size; /** RNDV pipeline fragment size */ size_t rndv_frag_size; + /** RNDV pipline send threshold */ + size_t rndv_pipeline_send_thresh; /** Threshold for using tag matching offload capabilities. Smaller buffers * will not be posted to the transport. */ size_t tm_thresh; diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 2cfd1043800..15d83eaf11a 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -154,12 +154,15 @@ struct ucp_request { } proxy; struct { - uint64_t remote_address; /* address of the sender's data buffer */ - uintptr_t remote_request; /* pointer to the sender's request */ - ucp_request_t *rreq; /* receive request on the recv side */ - ucp_rkey_h rkey; /* key for remote send buffer */ - ucp_lane_map_t lanes_map; /* used lanes map */ - ucp_lane_index_t lane_count; /* number of lanes used in transaction */ + uint64_t remote_address; /* address of the sender's data buffer */ + uintptr_t remote_request; /* pointer to the sender's request */ + ucp_request_t *rreq; /* receive request on the recv side */ + ucp_rkey_h rkey; /* key for remote send buffer */ + ucp_lane_map_t lanes_map_avail; /* used lanes map */ + ucp_lane_map_t lanes_map_all; /* actual lanes map */ + uint8_t lanes_count; /* actual lanes count */ + uint8_t rkey_index[UCP_MAX_LANES]; + } rndv_get; struct { diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 65dc573b631..240f6daf450 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -561,8 +561,7 @@ ucs_status_t ucp_tag_offload_sw_rndv(uct_pending_req_t *self) rndv_rts_hdr = ucs_alloca(rndv_hdr_len); packed_len = ucp_tag_rndv_rts_pack(rndv_rts_hdr, req); ucs_assert((rndv_rts_hdr->address != 0) || !UCP_DT_IS_CONTIG(req->send.datatype) || - !ucp_rndv_is_get_zcopy(req->send.mem_type, - ep->worker->context->config.ext.rndv_mode)); + !ucp_rndv_is_get_zcopy(req, ep->worker->context)); return uct_ep_tag_rndv_request(ep->uct_eps[req->send.lane], req->send.msg_proto.tag.tag, rndv_rts_hdr, packed_len, 0); diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 2a55dae9f3a..6615c587bb0 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -17,7 +17,8 @@ static int ucp_rndv_is_recv_pipeline_needed(ucp_request_t *rndv_req, const ucp_rndv_rts_hdr_t *rndv_rts_hdr, - ucs_memory_type_t mem_type) + ucs_memory_type_t mem_type, + int is_get_zcopy_failed) { ucp_md_index_t md_index; uct_md_attr_t *md_attr; @@ -28,6 +29,10 @@ static int ucp_rndv_is_recv_pipeline_needed(ucp_request_t *rndv_req, return 0; } + if (is_get_zcopy_failed) { + return 1; + } + /* disqualify recv side pipeline if * a mem_type bw lane exist AND * lane can do RMA on remote mem_type @@ -50,25 +55,13 @@ static int ucp_rndv_is_recv_pipeline_needed(ucp_request_t *rndv_req, static int ucp_rndv_is_put_pipeline_needed(uintptr_t remote_address, size_t length, size_t min_get_zcopy, - size_t max_get_zcopy) + size_t max_get_zcopy, + int is_get_zcopy_failed) { /* fallback to PUT pipeline if remote mem type is non-HOST memory OR * can't do GET ZCOPY */ return ((remote_address == 0) || (max_get_zcopy == 0) || - (length < min_get_zcopy)); -} - -static ucp_lane_index_t -ucp_rndv_req_get_zcopy_rma_lane(ucp_request_t *rndv_req, ucp_lane_map_t ignore, - uct_rkey_t *uct_rkey_p) -{ - ucp_ep_h ep = rndv_req->send.ep; - ucp_ep_config_t *ep_config = ucp_ep_config(ep); - - return ucp_rkey_find_rma_lane(ep->worker->context, ep_config, - rndv_req->send.mem_type, - ep_config->tag.rndv.get_zcopy_lanes, - rndv_req->send.rndv_get.rkey, ignore, uct_rkey_p); + (length < min_get_zcopy) || is_get_zcopy_failed); } size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) @@ -85,8 +78,7 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) /* Pack remote keys (which can be empty list) */ if (UCP_DT_IS_CONTIG(sreq->send.datatype) && - ucp_rndv_is_get_zcopy(sreq->send.mem_type, - worker->context->config.ext.rndv_mode)) { + ucp_rndv_is_get_zcopy(sreq, worker->context)) { /* pack rkey, ask target to do get_zcopy */ rndv_rts_hdr->address = (uintptr_t)sreq->send.buffer; packed_rkey_size = ucp_rkey_pack_uct(worker->context, @@ -181,8 +173,7 @@ ucs_status_t ucp_tag_rndv_reg_send_buffer(ucp_request_t *sreq) ucs_status_t status; if (UCP_DT_IS_CONTIG(sreq->send.datatype) && - ucp_rndv_is_get_zcopy(sreq->send.mem_type, - ep->worker->context->config.ext.rndv_mode)) { + ucp_rndv_is_get_zcopy(sreq, ep->worker->context)) { /* register a contiguous buffer for rma_get */ md_map = ucp_ep_config(ep)->key.rma_bw_md_map; @@ -419,63 +410,33 @@ static void ucp_rndv_req_send_rtr(ucp_request_t *rndv_req, ucp_request_t *rreq, ucp_request_send(rndv_req, 0); } -static void ucp_rndv_get_lanes_count(ucp_request_t *rndv_req) -{ - ucp_ep_h ep = rndv_req->send.ep; - ucp_lane_map_t map = 0; - uct_rkey_t uct_rkey; - ucp_lane_index_t lane; - - if (ucs_likely(rndv_req->send.rndv_get.lane_count != 0)) { - return; /* already resolved */ - } - - while ((lane = ucp_rndv_req_get_zcopy_rma_lane(rndv_req, map, &uct_rkey)) - != UCP_NULL_LANE) { - rndv_req->send.rndv_get.lane_count++; - map |= UCS_BIT(lane); - } - - rndv_req->send.rndv_get.lane_count = ucs_min(rndv_req->send.rndv_get.lane_count, - ep->worker->context->config.ext.max_rndv_lanes); -} - static ucp_lane_index_t ucp_rndv_get_zcopy_get_lane(ucp_request_t *rndv_req, uct_rkey_t *uct_rkey) { - ucp_lane_index_t lane; + ucp_lane_index_t lane_idx; + ucp_ep_config_t *ep_config; + ucp_rkey_h rkey; + uint8_t rkey_index; - lane = ucp_rndv_req_get_zcopy_rma_lane(rndv_req, - rndv_req->send.rndv_get.lanes_map, - uct_rkey); - - if ((lane == UCP_NULL_LANE) && (rndv_req->send.rndv_get.lanes_map != 0)) { - /* lanes_map != 0 - no more lanes (but BW lanes are exist because map - * is not NULL - we found at least one lane on previous iteration). - * reset used lanes map to NULL and iterate it again */ - rndv_req->send.rndv_get.lanes_map = 0; - lane = ucp_rndv_req_get_zcopy_rma_lane(rndv_req, - rndv_req->send.rndv_get.lanes_map, - uct_rkey); + if (ucs_unlikely(!rndv_req->send.rndv_get.lanes_map_all)) { + return UCP_NULL_LANE; } - return lane; + lane_idx = ucs_ffs64_safe(rndv_req->send.rndv_get.lanes_map_avail); + ucs_assert(lane_idx < UCP_MAX_LANES); + rkey = rndv_req->send.rndv_get.rkey; + rkey_index = rndv_req->send.rndv_get.rkey_index[lane_idx]; + *uct_rkey = (rkey_index != UCP_NULL_RESOURCE) ? + rkey->tl_rkey[rkey_index].rkey.rkey : UCT_INVALID_RKEY; + ep_config = ucp_ep_config(rndv_req->send.ep); + return ep_config->tag.rndv.get_zcopy_lanes[lane_idx]; } static void ucp_rndv_get_zcopy_next_lane(ucp_request_t *rndv_req) { - /* mask lane for next iteration. - * next time this lane will not be selected & we continue - * with another lane */ - ucp_ep_h ep = rndv_req->send.ep; - - rndv_req->send.rndv_get.lanes_map |= UCS_BIT(rndv_req->send.lane); - - /* in case if masked too much lanes - reset mask to zero - * to select first lane next time */ - if (ucs_popcount(rndv_req->send.rndv_get.lanes_map) >= - ep->worker->context->config.ext.max_rndv_lanes) { - rndv_req->send.rndv_get.lanes_map = 0; + rndv_req->send.rndv_get.lanes_map_avail &= rndv_req->send.rndv_get.lanes_map_avail - 1; + if (!rndv_req->send.rndv_get.lanes_map_avail) { + rndv_req->send.rndv_get.lanes_map_avail = rndv_req->send.rndv_get.lanes_map_all; } } @@ -499,9 +460,6 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self), int pending_add_res; ucp_lane_index_t lane; - ucp_rndv_get_lanes_count(rndv_req); - ucs_assert_always(rndv_req->send.rndv_get.lane_count > 0); - /* Figure out which lane to use for get operation */ rndv_req->send.lane = lane = ucp_rndv_get_zcopy_get_lane(rndv_req, &uct_rkey); @@ -521,6 +479,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self), return UCS_OK; } + ucs_assert_always(rndv_req->send.rndv_get.lanes_count > 0); + if (!rndv_req->send.mdesc) { status = ucp_send_request_add_reg_lane(rndv_req, lane); ucs_assert_always(status == UCS_OK); @@ -540,7 +500,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self), length = ucp_mtu - remaining; } else { chunk = ucs_align_up((size_t)(rndv_req->send.length / - rndv_req->send.rndv_get.lane_count + rndv_req->send.rndv_get.lanes_count * config->tag.rndv.scale[lane]), align); length = ucs_min(chunk, rndv_req->send.length - offset); @@ -622,11 +582,93 @@ static void ucp_rndv_put_completion(uct_completion_t *self, ucs_status_t status) } } -static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rreq, - const ucp_rndv_rts_hdr_t *rndv_rts_hdr) +static void ucp_rndv_req_init_get_zcopy_lane_map(ucp_request_t *rndv_req) +{ + ucp_ep_h ep = rndv_req->send.ep; + ucp_ep_config_t *ep_config = ucp_ep_config(ep); + ucp_context_h context = ep->worker->context; + ucs_memory_type_t mem_type = rndv_req->send.mem_type; + ucp_rkey_h rkey = rndv_req->send.rndv_get.rkey; + ucp_lane_map_t lane_map; + ucp_lane_index_t lane, lane_idx; + ucp_md_index_t md_index; + uct_md_attr_t *md_attr; + ucp_md_index_t dst_md_index; + ucp_rsc_index_t rsc_index; + uct_iface_attr_t *iface_attr; + double max_lane_bw, lane_bw; + int i; + + max_lane_bw = 0; + lane_map = 0; + for (i = 0; i < UCP_MAX_LANES; i++) { + lane = ep_config->tag.rndv.get_zcopy_lanes[i]; + if (lane == UCP_NULL_LANE) { + break; /* no more lanes */ + } + + md_index = ep_config->md_index[lane]; + md_attr = &context->tl_mds[md_index].attr; + rsc_index = ep_config->key.lanes[lane].rsc_index; + iface_attr = ucp_worker_iface_get_attr(ep->worker, rsc_index); + lane_bw = ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth); + + if (ucs_unlikely((md_index != UCP_NULL_RESOURCE) && + !(md_attr->cap.flags & UCT_MD_FLAG_NEED_RKEY))) { + /* Lane does not need rkey, can use the lane with invalid rkey */ + if (!rkey || ((mem_type == md_attr->cap.access_mem_type) && + (mem_type == rkey->mem_type))) { + rndv_req->send.rndv_get.rkey_index[i] = UCP_NULL_RESOURCE; + lane_map |= UCS_BIT(i); + max_lane_bw = ucs_max(max_lane_bw, lane_bw); + continue; + } + } + + if (ucs_unlikely((md_index != UCP_NULL_RESOURCE) && + (!(md_attr->cap.reg_mem_types & UCS_BIT(mem_type))))) { + continue; + } + + dst_md_index = ep_config->key.lanes[lane].dst_md_index; + if (rkey && ucs_likely(rkey->md_map & UCS_BIT(dst_md_index))) { + /* Return first matching lane */ + rndv_req->send.rndv_get.rkey_index[i] = ucs_bitmap2idx(rkey->md_map, + dst_md_index); + lane_map |= UCS_BIT(i); + max_lane_bw = ucs_max(max_lane_bw, lane_bw); + } + } + + if (ucs_popcount(lane_map) > 1) { + /* remove lanes if bandwidth is too less compare to best lane */ + ucs_for_each_bit(lane_idx, lane_map) { + ucs_assert(lane_idx < UCP_MAX_LANES); + lane = ep_config->tag.rndv.get_zcopy_lanes[lane_idx]; + rsc_index = ep_config->key.lanes[lane].rsc_index; + iface_attr = ucp_worker_iface_get_attr(ep->worker, rsc_index); + lane_bw = ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth); + + if ((lane_bw/max_lane_bw) < + (1. / context->config.ext.multi_lane_max_ratio)) { + lane_map &= ~UCS_BIT(lane_idx); + rndv_req->send.rndv_get.rkey_index[lane_idx] = UCP_NULL_RESOURCE; + } + } + } + + rndv_req->send.rndv_get.lanes_map_all = lane_map; + rndv_req->send.rndv_get.lanes_map_avail = lane_map; + rndv_req->send.rndv_get.lanes_count = ucs_popcount(lane_map); +} + +static ucs_status_t ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, + ucp_request_t *rreq, + const ucp_rndv_rts_hdr_t *rndv_rts_hdr) { ucp_ep_h ep = rndv_req->send.ep; ucs_status_t status; + uct_rkey_t uct_rkey; ucp_trace_req(rndv_req, "start rma_get rreq %p", rreq); @@ -638,8 +680,6 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr rndv_req->send.rndv_get.remote_request = rndv_rts_hdr->sreq.reqptr; rndv_req->send.rndv_get.remote_address = rndv_rts_hdr->address; rndv_req->send.rndv_get.rreq = rreq; - rndv_req->send.rndv_get.lanes_map = 0; - rndv_req->send.rndv_get.lane_count = 0; rndv_req->send.datatype = rreq->recv.datatype; status = ucp_ep_rkey_unpack(ep, rndv_rts_hdr + 1, @@ -653,8 +693,17 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr ucp_request_send_state_reset(rndv_req, ucp_rndv_get_completion, UCP_REQUEST_SEND_PROTO_RNDV_GET); + ucp_rndv_req_init_get_zcopy_lane_map(rndv_req); + + rndv_req->send.lane = ucp_rndv_get_zcopy_get_lane(rndv_req, &uct_rkey); + if (rndv_req->send.lane == UCP_NULL_LANE) { + return UCS_ERR_UNREACHABLE; + } + UCP_WORKER_STAT_RNDV(ep->worker, GET_ZCOPY, 1); ucp_request_send(rndv_req, 0); + + return UCS_OK; } UCS_PROFILE_FUNC_VOID(ucp_rndv_recv_frag_put_completion, (self, status), @@ -752,11 +801,13 @@ static ucs_status_t ucp_rndv_send_frag_get_mem_type(ucp_request_t *sreq, uintptr_t rreq_ptr, size_t length, uint64_t remote_address, ucs_memory_type_t remote_mem_type, ucp_rkey_h rkey, + uint8_t *rkey_index, ucp_lane_map_t lanes_map, uct_completion_callback_t comp_cb) { ucp_worker_h worker = sreq->send.ep->worker; ucp_request_t *freq; ucp_mem_desc_t *mdesc; + ucp_lane_index_t i; /* GET fragment to stage buffer */ @@ -778,12 +829,19 @@ ucp_rndv_send_frag_get_mem_type(ucp_request_t *sreq, uintptr_t rreq_ptr, comp_cb, mdesc, remote_mem_type, length, ucp_rndv_progress_rma_get_zcopy); - freq->send.rndv_get.rkey = rkey; - freq->send.rndv_get.remote_address = remote_address; - freq->send.rndv_get.remote_request = rreq_ptr; - freq->send.rndv_get.lanes_map = 0; - freq->send.rndv_get.lane_count = 0; - freq->send.rndv_get.rreq = sreq; + freq->send.rndv_get.rkey = rkey; + freq->send.rndv_get.remote_address = remote_address; + freq->send.rndv_get.remote_request = rreq_ptr; + freq->send.rndv_get.rreq = sreq; + freq->send.rndv_get.lanes_map_all = lanes_map; + freq->send.rndv_get.lanes_map_avail = lanes_map; + freq->send.rndv_get.lanes_count = ucs_popcount(lanes_map); + + for (i = 0; i < UCP_MAX_LANES; i++) { + freq->send.rndv_get.rkey_index[i] = rkey_index ? rkey_index[i] + : UCP_NULL_RESOURCE; + } + return ucp_request_send(freq, 0); } @@ -830,6 +888,7 @@ ucp_rndv_recv_start_get_pipeline(ucp_worker_h worker, ucp_request_t *rndv_req, rndv_req->send.rndv_get.rreq = rreq; rndv_req->send.length = size; rndv_req->send.state.dt.offset = 0; + rndv_req->send.mem_type = rreq->recv.mem_type; /* Protocol: * Step 1: GET remote fragment into HOST fragment buffer @@ -844,6 +903,8 @@ ucp_rndv_recv_start_get_pipeline(ucp_worker_h worker, ucp_request_t *rndv_req, ucp_ep_peer_name(rndv_req->send.ep), ucs_status_string(status)); } + ucp_rndv_req_init_get_zcopy_lane_map(rndv_req); + offset = 0; while (offset != size) { length = ucp_rndv_adjust_zcopy_length(min_zcopy, max_frag_size, 0, @@ -853,6 +914,8 @@ ucp_rndv_recv_start_get_pipeline(ucp_worker_h worker, ucp_request_t *rndv_req, ucp_rndv_send_frag_get_mem_type(rndv_req, remote_request, length, remote_address + offset, UCS_MEMORY_TYPE_HOST, rndv_req->send.rndv_get.rkey, + rndv_req->send.rndv_get.rkey_index, + rndv_req->send.rndv_get.lanes_map_all, ucp_rndv_recv_frag_get_completion); offset += length; @@ -1074,6 +1137,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), ucp_request_t *rndv_req; ucp_ep_h ep; ucp_ep_config_t *ep_config; + ucs_status_t status; + int is_get_zcopy_failed; UCS_ASYNC_BLOCK(&worker->async); @@ -1096,6 +1161,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), rndv_req->flags = 0; rndv_req->send.mdesc = NULL; rndv_req->send.pending_lane = UCP_NULL_LANE; + is_get_zcopy_failed = 0; ucp_trace_req(rreq, "rndv matched remote {address 0x%"PRIx64" size %zu sreq 0x%lx}" @@ -1124,25 +1190,34 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) { if ((rndv_rts_hdr->address != 0) && - (ucp_rndv_is_get_zcopy(rreq->recv.mem_type, rndv_mode)) && ucp_rndv_test_zcopy_scheme_support(rndv_rts_hdr->size, ep_config->tag.rndv.min_get_zcopy, ep_config->tag.rndv.max_get_zcopy, ep_config->tag.rndv.get_zcopy_split)) { /* try to fetch the data with a get_zcopy operation */ - ucp_rndv_req_send_rma_get(rndv_req, rreq, rndv_rts_hdr); - goto out; - } else if (rndv_mode == UCP_RNDV_MODE_AUTO) { + status = ucp_rndv_req_send_rma_get(rndv_req, rreq, rndv_rts_hdr); + if (status == UCS_OK) { + goto out; + } + + /* fallback to non get zcopy protocol */ + ucp_rkey_destroy(rndv_req->send.rndv_get.rkey); + is_get_zcopy_failed = 1; + } + + if (rndv_mode == UCP_RNDV_MODE_AUTO) { /* check if we need pipelined memtype staging */ if (UCP_MEM_IS_CUDA(rreq->recv.mem_type) && ucp_rndv_is_recv_pipeline_needed(rndv_req, rndv_rts_hdr, - rreq->recv.mem_type)) { + rreq->recv.mem_type, + is_get_zcopy_failed)) { ucp_rndv_recv_data_init(rreq, rndv_rts_hdr->size); if (ucp_rndv_is_put_pipeline_needed(rndv_rts_hdr->address, rndv_rts_hdr->size, ep_config->tag.rndv.min_get_zcopy, - ep_config->tag.rndv.max_get_zcopy)) { + ep_config->tag.rndv.max_get_zcopy, + is_get_zcopy_failed)) { /* send FRAG RTR for sender to PUT the fragment. */ ucp_rndv_send_frag_rtr(worker, rndv_req, rreq, rndv_rts_hdr); } else { @@ -1156,10 +1231,14 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), goto out; } } - /* put protocol is allowed - register receive buffer memory for rma */ - ucs_assert(rndv_rts_hdr->size <= rreq->recv.length); - ucp_request_recv_buffer_reg(rreq, ep_config->key.rma_bw_md_map, - rndv_rts_hdr->size); + + if ((rndv_mode == UCP_RNDV_MODE_PUT_ZCOPY) || + UCP_MEM_IS_CUDA(rreq->recv.mem_type)) { + /* put protocol is allowed - register receive buffer memory for rma */ + ucs_assert(rndv_rts_hdr->size <= rreq->recv.length); + ucp_request_recv_buffer_reg(rreq, ep_config->key.rma_bw_md_map, + rndv_rts_hdr->size); + } } /* The sender didn't specify its address in the RTS, or the rndv mode was @@ -1522,7 +1601,8 @@ static ucs_status_t ucp_rndv_send_start_put_pipeline(ucp_request_t *sreq, } else { ucp_rndv_send_frag_get_mem_type(fsreq, 0, length, (uint64_t)UCS_PTR_BYTE_OFFSET(fsreq->send.buffer, offset), - fsreq->send.mem_type, NULL, ucp_rndv_put_pipeline_frag_get_completion); + fsreq->send.mem_type, NULL, NULL, UCS_BIT(0), + ucp_rndv_put_pipeline_frag_get_completion); } offset += length; diff --git a/src/ucp/tag/rndv.h b/src/ucp/tag/rndv.h index 60b72e60845..449c923c8bc 100644 --- a/src/ucp/tag/rndv.h +++ b/src/ucp/tag/rndv.h @@ -62,13 +62,13 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg); ucs_status_t ucp_tag_rndv_reg_send_buffer(ucp_request_t *sreq); -static UCS_F_ALWAYS_INLINE int ucp_rndv_is_get_zcopy(ucs_memory_type_t mem_type, - ucp_rndv_mode_t rndv_mode) +static UCS_F_ALWAYS_INLINE int +ucp_rndv_is_get_zcopy(ucp_request_t *req, ucp_context_h context) { - return ((rndv_mode == UCP_RNDV_MODE_GET_ZCOPY) || - ((rndv_mode == UCP_RNDV_MODE_AUTO) && - (UCP_MEM_IS_ACCESSIBLE_FROM_CPU(mem_type) || - UCP_MEM_IS_ROCM(mem_type)))); + return ((context->config.ext.rndv_mode == UCP_RNDV_MODE_GET_ZCOPY) || + ((context->config.ext.rndv_mode == UCP_RNDV_MODE_AUTO) && + (!UCP_MEM_IS_CUDA(req->send.mem_type) || + (req->send.length < context->config.ext.rndv_pipeline_send_thresh)))); } #endif From 71387c06651cd0867eb8e547f5282e92ba4edb34 Mon Sep 17 00:00:00 2001 From: Devendar Bureddy Date: Tue, 8 Sep 2020 22:46:54 +0300 Subject: [PATCH 2/2] UCX/RNDV/CUDA: fix topo --- src/ucp/core/ucp_context.h | 2 +- src/ucp/tag/rndv.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index e56f7902048..5393877ea3d 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -61,7 +61,7 @@ typedef struct ucp_context_config { size_t seg_size; /** RNDV pipeline fragment size */ size_t rndv_frag_size; - /** RNDV pipline send threshold */ + /** RNDV pipeline send threshold */ size_t rndv_pipeline_send_thresh; /** Threshold for using tag matching offload capabilities. Smaller buffers * will not be posted to the transport. */ diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 6615c587bb0..b06967bca71 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -641,7 +641,7 @@ static void ucp_rndv_req_init_get_zcopy_lane_map(ucp_request_t *rndv_req) } if (ucs_popcount(lane_map) > 1) { - /* remove lanes if bandwidth is too less compare to best lane */ + /* remove lanes if bandwidth is too low comparing to the best lane */ ucs_for_each_bit(lane_idx, lane_map) { ucs_assert(lane_idx < UCP_MAX_LANES); lane = ep_config->tag.rndv.get_zcopy_lanes[lane_idx];