diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index a10dd627e388..fb302c983b42 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -98,6 +98,10 @@ static ucs_config_field_t ucp_config_table[] = { "the eager_zcopy protocol", ucs_offsetof(ucp_config_t, ctx.rndv_perf_diff), UCS_CONFIG_TYPE_DOUBLE}, + {"RNDV_MRAIL", "n", + "Enable multirail-get rendezvous support", + ucs_offsetof(ucp_config_t, ctx.rndv_mrail), UCS_CONFIG_TYPE_BOOL}, + {"ZCOPY_THRESH", "auto", "Threshold for switching from buffer copy to zero copy protocol", ucs_offsetof(ucp_config_t, ctx.zcopy_thresh), UCS_CONFIG_TYPE_MEMUNITS}, diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 827f486ef873..2c4121198edb 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -53,6 +53,8 @@ typedef struct ucp_context_config { int use_mt_mutex; /** On-demand progress */ int adaptive_progress; + /** Rendezvous multirail support */ + int rndv_mrail; } ucp_context_config_t; diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index 84a8659bb95e..ef0f8f002a1d 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -81,10 +81,6 @@ ucp_request_complete_send(ucp_request_t *req, ucs_status_t status) ucs_status_string(status)); UCS_PROFILE_REQUEST_EVENT(req, "complete_send", status); - if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_RNDV_MRAIL)) { - ucs_mpool_put_inline(req->send.rndv_get.mrail); - } - ucp_request_complete(req, send.cb, status); } @@ -101,10 +97,6 @@ ucp_request_complete_recv(ucp_request_t *req, ucs_status_t status) --req->recv.worker->context->tm.offload.sw_req_count; } - if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_RNDV_MRAIL)) { - ucs_mpool_put_inline(req->send.rndv_get.mrail); - } - ucp_request_complete(req, recv.cb, status, &req->recv.info); } @@ -208,6 +200,14 @@ ucp_request_mrail_create(ucp_request_t *req) req->flags |= UCP_REQUEST_FLAG_RNDV_MRAIL; } +static UCS_F_ALWAYS_INLINE void +ucp_request_mrail_release(ucp_request_t *req) +{ + ucs_trace_req("mrail release request %p", req); + ucs_mpool_put_inline(req->send.rndv_get.mrail); + req->flags &= ~UCP_REQUEST_FLAG_RNDV_MRAIL; +} + static UCS_F_ALWAYS_INLINE void ucp_request_clear_rails(ucp_dt_state_t *state) { int i; @@ -240,7 +240,7 @@ static inline int ucp_request_mrail_reg(ucp_request_t *req) ucp_request_clear_rails(state); - for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) { + for (i = 0; i < UCP_MAX_RAILS && ucp_ep_is_rndv_lane_present(ep, i); i++) { lane = ucp_ep_get_rndv_get_lane(ep, i); if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) { diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index a79f32f97f75..ab0a53fa51db 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -110,7 +110,7 @@ static void ucp_tag_rndv_unpack_mrail_rkeys(ucp_request_t *req, void *rkey_buf) ucp_request_mrail_create(req); - for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) { + for (i = 0; i < UCP_MAX_RAILS && ucp_ep_is_rndv_lane_present(ep, i); i++) { lane = ucp_ep_get_rndv_get_lane(ep, i); if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) { UCS_PROFILE_CALL(uct_rkey_unpack, rkey_buf + packet, @@ -119,8 +119,6 @@ static void ucp_tag_rndv_unpack_mrail_rkeys(ucp_request_t *req, void *rkey_buf) packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size; } } - - req->flags |= UCP_REQUEST_FLAG_RNDV_MRAIL; } static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) @@ -416,6 +414,10 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_get_completion, (self, status), { ucp_request_t *rndv_req = ucs_container_of(self, ucp_request_t, send.uct_comp); + if (rndv_req->flags & UCP_REQUEST_FLAG_RNDV_MRAIL) { + ucp_request_mrail_release(rndv_req); + } + if (rndv_req->send.state.offset == rndv_req->send.length) { ucs_trace_req("completed rndv get operation rndv_req: %p", rndv_req); ucp_rndv_complete_rndv_get(rndv_req); diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 723b3e1c978c..8c90319bdb05 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -20,7 +20,8 @@ enum { UCP_WIREUP_LANE_USAGE_RMA = UCS_BIT(1), UCP_WIREUP_LANE_USAGE_AMO = UCS_BIT(2), UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3), - UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4) + UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4), + UCP_WIREUP_LANE_USAGE_RNDV_MRAIL = UCS_BIT(5) }; @@ -396,6 +397,23 @@ static int ucp_wireup_compare_lane_amo_score(const void *elem1, const void *elem return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, amo); } +static uint64_t ucp_wireup_unset_tl_by_md(ucp_ep_h ep, uint64_t tl_bitmap, + ucp_rsc_index_t rsc_index) +{ + ucp_worker_h worker = ep->worker; + ucp_context_h context = worker->context; + ucp_rsc_index_t md_index = context->tl_rscs[rsc_index].md_index; + ucp_rsc_index_t i; + + for (i = 0; i < context->num_tls; i++) { + if (context->tl_rscs[i].md_index == md_index) { + tl_bitmap &= ~UCS_BIT(i); + } + } + + return tl_bitmap; +} + static UCS_F_NOINLINE ucs_status_t ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, @@ -440,26 +458,36 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, dst_md_index = address_list_copy[addr_index].md_index; reg_score = score; + if (!(usage & UCP_WIREUP_LANE_USAGE_RNDV_MRAIL)) { + /* Select additional transports which can access allocated memory, but only + * if their scores are better. We need this because a remote memory block can + * be potentially allocated using one of them, and we might get better performance + * than the transports which support only registered remote memory. + */ + snprintf(title, sizeof(title), criteria->title, "allocated"); + mem_criteria.title = title; + mem_criteria.remote_md_flags = UCT_MD_FLAG_ALLOC; + } else { + tl_bitmap = ucp_wireup_unset_tl_by_md(ep, tl_bitmap, rsc_index); + + if (strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni")) { + /* a temporary workaround to prevent the ugni uct from using rndv */ + goto out_free_address_list; + } + } + /* Add to the list of lanes and remove all occurrences of the remote md * from the address list, to avoid selecting the same remote md again.*/ ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, dst_md_index, score, usage, 0); remote_md_map &= ~UCS_BIT(dst_md_index); - /* Select additional transports which can access allocated memory, but only - * if their scores are better. We need this because a remote memory block can - * be potentially allocated using one of them, and we might get better performance - * than the transports which support only registered remote memory. - */ - snprintf(title, sizeof(title), criteria->title, "allocated"); - mem_criteria.title = title; - mem_criteria.remote_md_flags = UCT_MD_FLAG_ALLOC; - while (address_count > 0) { status = ucp_wireup_select_transport(ep, address_list_copy, address_count, &mem_criteria, tl_bitmap, remote_md_map, 0, &rsc_index, &addr_index, &score); - if ((status != UCS_OK) || (score <= reg_score)) { + if ((status != UCS_OK) || + ((score <= reg_score) && !(usage & UCP_WIREUP_LANE_USAGE_RNDV_MRAIL))) { break; } @@ -468,6 +496,10 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, dst_md_index, score, usage, 0); remote_md_map &= ~UCS_BIT(dst_md_index); + + if (usage & UCP_WIREUP_LANE_USAGE_RNDV_MRAIL) { + tl_bitmap = ucp_wireup_unset_tl_by_md(ep, tl_bitmap, rsc_index); + } } status = UCS_OK; @@ -690,12 +722,12 @@ static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, const ucp_ep_params_t *p return UCS_OK; } -static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, - const ucp_ep_params_t *params, - unsigned address_count, - const ucp_address_entry_t *address_list, - ucp_wireup_lane_desc_t *lane_descs, - ucp_lane_index_t *num_lanes_p) +static ucs_status_t ucp_wireup_add_rndv_lanes(ucp_ep_h ep, + const ucp_ep_params_t *params, + unsigned address_count, + const ucp_address_entry_t *address_list, + ucp_wireup_lane_desc_t *lane_descs, + ucp_lane_index_t *num_lanes_p) { ucp_wireup_criteria_t criteria; ucp_rsc_index_t rsc_index; @@ -721,17 +753,24 @@ static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS; } - status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, - -1, -1, 0, &rsc_index, &addr_index, &score); - if ((status == UCS_OK) && - /* a temporary workaround to prevent the ugni uct from using rndv */ - (strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") == NULL)) { - ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, - address_list[addr_index].md_index, score, - UCP_WIREUP_LANE_USAGE_RNDV, 0); + if (ep->worker->context->config.ext.rndv_mrail) { + status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, + -1, -1, 0, &rsc_index, &addr_index, &score); + if ((status == UCS_OK) && + /* a temporary workaround to prevent the ugni uct from using rndv */ + (strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") == NULL)) { + ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, + address_list[addr_index].md_index, score, + UCP_WIREUP_LANE_USAGE_RNDV, 0); + } + return status; + } else { + return ucp_wireup_add_memaccess_lanes(ep, address_count, address_list, + lane_descs, num_lanes_p, &criteria, + -1, + UCP_WIREUP_LANE_USAGE_RNDV | + UCP_WIREUP_LANE_USAGE_RNDV_MRAIL); } - - return UCS_OK; } /* Lane for transport offloaded tag interface */ @@ -886,8 +925,8 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, return status; } - status = ucp_wireup_add_rndv_lane(ep, params, address_count, address_list, - lane_descs, &key->num_lanes); + status = ucp_wireup_add_rndv_lanes(ep, params, address_count, address_list, + lane_descs, &key->num_lanes); if (status != UCS_OK) { return status; } @@ -924,8 +963,8 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, } if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RNDV) { /* TODO: add rndv sort */ - ucs_assert(key->rndv_lanes[0] == UCP_NULL_LANE); - key->rndv_lanes[0] = lane; + ucs_assert(key->rndv_lanes[lane] == UCP_NULL_LANE); + key->rndv_lanes[lane] = lane; } if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RMA) { key->rma_lanes[lane] = lane; diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index b64df2a17309..a5d0c5973868 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -98,6 +98,8 @@ static ucs_status_t ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type, ucs_status_t status; void *address; + ucs_trace("%s: type: %d", __FUNCTION__, type); + ucs_assert(ep->cfg_index != (uint8_t)-1); /* We cannot allocate from memory pool because it's not thread safe diff --git a/test/examples/ucp_hello_world.c b/test/examples/ucp_hello_world.c index 31f0618e0154..74b914d0ee92 100644 --- a/test/examples/ucp_hello_world.c +++ b/test/examples/ucp_hello_world.c @@ -415,7 +415,7 @@ int main(int argc, char **argv) ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_REQUEST_SIZE | UCP_PARAM_FIELD_REQUEST_INIT; - ucp_params.features = UCP_FEATURE_TAG; + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA; if (ucp_test_mode == TEST_MODE_WAIT || ucp_test_mode == TEST_MODE_EVENTFD) { ucp_params.features |= UCP_FEATURE_WAKEUP; }