From 3dd33cc3d572140fcc195cc806c2f14a0118c74f Mon Sep 17 00:00:00 2001 From: Sergey Oblomov Date: Fri, 6 Oct 2017 17:10:04 +0300 Subject: [PATCH] UCP:rndv multirail support infrastructure + protocol - added infrastructure to support multi-rail on rndv-RMA: - updated rndv-rma lane to array - added memory handles to store registered buffer and remote keys --- src/ucp/core/ucp_ep.c | 35 +++++----- src/ucp/core/ucp_ep.h | 4 +- src/ucp/core/ucp_ep.inl | 24 ++++--- src/ucp/core/ucp_request.c | 3 +- src/ucp/core/ucp_request.h | 9 ++- src/ucp/core/ucp_request.inl | 66 ++++++++++++++++++ src/ucp/core/ucp_types.h | 3 +- src/ucp/core/ucp_worker.c | 2 +- src/ucp/dt/dt.h | 11 ++- src/ucp/rma/basic_rma.c | 1 + src/ucp/tag/rndv.c | 132 ++++++++++++++++++++++++++++++----- src/ucp/tag/rndv.h | 5 +- src/ucp/wireup/select.c | 5 +- 13 files changed, 244 insertions(+), 56 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index f378a95a212c..ee258a18d385 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -39,14 +39,14 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key) memset(key, 0, sizeof(*key)); key->num_lanes = 0; key->am_lane = UCP_NULL_LANE; - key->rndv_lane = UCP_NULL_LANE; key->wireup_lane = UCP_NULL_LANE; key->tag_lane = UCP_NULL_LANE; key->reachable_md_map = 0; key->err_mode = UCP_ERR_HANDLING_MODE_NONE; key->status = UCS_OK; - memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes)); - memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes)); + memset(key->rndv_lanes, UCP_NULL_LANE, sizeof(key->rndv_lanes)); + memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes)); + memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes)); } ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid, @@ -150,7 +150,7 @@ ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid, key.lanes[0].rsc_index = UCP_NULL_RESOURCE; key.lanes[0].dst_md_index = UCP_NULL_RESOURCE; key.am_lane = 0; - key.rndv_lane = 0; + key.rndv_lanes[0] = 0; key.wireup_lane = 0; ep->cfg_index = ucp_worker_get_ep_config(worker, &key); @@ -694,16 +694,16 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, ucp_lane_index_t lane; - if ((key1->num_lanes != key2->num_lanes) || - memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) || - memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) || - (key1->reachable_md_map != key2->reachable_md_map) || - (key1->am_lane != key2->am_lane) || - (key1->rndv_lane != key2->rndv_lane) || - (key1->tag_lane != key2->tag_lane) || - (key1->wireup_lane != key2->wireup_lane) || - (key1->err_mode != key2->err_mode) || - (key1->status != key2->status)) + if ((key1->num_lanes != key2->num_lanes) || + memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) || + memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) || + memcmp(key1->rndv_lanes, key2->rndv_lanes, sizeof(key1->rndv_lanes)) || + (key1->reachable_md_map != key2->reachable_md_map) || + (key1->am_lane != key2->am_lane) || + (key1->tag_lane != key2->tag_lane) || + (key1->wireup_lane != key2->wireup_lane) || + (key1->err_mode != key2->err_mode) || + (key1->status != key2->status)) { return 0; } @@ -992,7 +992,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) if (!ucp_ep_is_tag_offload_enabled(config)) { /* Tag offload is disabled, AM will be used for all * tag-matching protocols */ - ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane, + ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lanes[0], UCT_IFACE_FLAG_GET_ZCOPY, max_rndv_thresh); config->tag.eager = config->am; @@ -1186,8 +1186,9 @@ void ucp_ep_config_lane_info_str(ucp_context_h context, p += strlen(p); } - if (lane == key->rndv_lane) { - snprintf(p, endp - p, " zcopy_rndv"); + prio = ucp_ep_config_get_rma_prio(key->rndv_lanes, lane); + if (prio != -1) { + snprintf(p, endp - p, " zcopy_rndv#%d", prio); p += strlen(p); } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index f6fd896ee1b8..bdfe8eba1438 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -71,10 +71,12 @@ typedef struct ucp_ep_config_key { } lanes[UCP_MAX_LANES]; ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ - ucp_lane_index_t rndv_lane; /* Lane for zcopy Rendezvous (can be NULL) */ ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ + /* Lane for zcopy Rendezvous (can be NULL), sorted by priority, highest first */ + ucp_lane_index_t rndv_lanes[UCP_MAX_LANES]; + /* Lanes for remote memory access, sorted by priority, highest first */ ucp_lane_index_t rma_lanes[UCP_MAX_LANES]; diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index 3e347649b9f3..a538a3c547bc 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -32,10 +32,10 @@ static inline ucp_lane_index_t ucp_ep_get_wireup_msg_lane(ucp_ep_h ep) return (lane == UCP_NULL_LANE) ? ucp_ep_get_am_lane(ep) : lane; } -static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep) +static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep, int idx) { - ucs_assert(ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE); - return ucp_ep_config(ep)->key.rndv_lane; + ucs_assert(ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE); + return ucp_ep_config(ep)->key.rndv_lanes[idx]; } static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep) @@ -44,9 +44,15 @@ static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep) return ucp_ep_config(ep)->key.tag_lane; } -static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep) +static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep, int idx) { - return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE; + return ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE; +} + +static inline int ucp_ep_is_rndv_mrail_present(ucp_ep_h ep) +{ + return (ucp_ep_config(ep)->key.rndv_lanes[0] != UCP_NULL_LANE && + ucp_ep_config(ep)->key.rndv_lanes[1] != UCP_NULL_LANE); } static inline int ucp_ep_is_tag_offload_enabled(ucp_ep_config_t *config) @@ -65,9 +71,9 @@ static inline uct_ep_h ucp_ep_get_am_uct_ep(ucp_ep_h ep) return ep->uct_eps[ucp_ep_get_am_lane(ep)]; } -static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep) +static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep, int idx) { - return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep)]; + return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep, idx)]; } static inline uct_ep_h ucp_ep_get_tag_uct_ep(ucp_ep_h ep) @@ -120,9 +126,9 @@ static inline const uct_md_attr_t* ucp_ep_md_attr(ucp_ep_h ep, ucp_lane_index_t return &context->tl_mds[ucp_ep_md_index(ep, lane)].attr; } -static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep) +static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep, int idx) { - return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep))->cap.flags; + return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep, idx))->cap.flags; } static inline const char* ucp_ep_peer_name(ucp_ep_h ep) diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index c35ad58d350f..d4adea406c41 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -175,7 +175,8 @@ void ucp_iov_buffer_memh_dereg(uct_md_h uct_md, uct_mem_h *memh, UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg, (context, rsc_index, buffer, length, datatype, state), ucp_context_t *context, ucp_rsc_index_t rsc_index, void *buffer, - size_t length, ucp_datatype_t datatype, ucp_dt_state_t *state) + size_t length, ucp_datatype_t datatype, + ucp_dt_state_t *state) { ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index; uct_md_h uct_md = context->tl_mds[mdi].md; diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 52c92dbddc29..861d5c1cbf89 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -115,6 +115,12 @@ struct ucp_request { uintptr_t remote_request; /* pointer to the sender's send request */ uct_rkey_bundle_t rkey_bundle; ucp_request_t *rreq; /* receive request on the recv side */ + unsigned use_mrail; + unsigned rail_idx; + struct { + ucp_lane_index_t lane; + uct_rkey_bundle_t rkey_bundle; + } mrail[UCP_MAX_RAILS]; } rndv_get; struct { @@ -196,7 +202,8 @@ void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane); ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index, void *buffer, size_t length, - ucp_datatype_t datatype, ucp_dt_state_t *state); + ucp_datatype_t datatype, + ucp_dt_state_t *state); void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index, ucp_datatype_t datatype, ucp_dt_state_t *state); diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index a2e1d0b5798b..839722621c95 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -187,3 +187,69 @@ static UCS_F_ALWAYS_INLINE void ucp_request_send_stat(ucp_request_t *req) UCP_EP_STAT_TAG_OP(req->send.ep, EAGER); } } + +static UCS_F_ALWAYS_INLINE void +ucp_request_clear_rails(ucp_dt_state_t *state) { + int i; + for(i = 0; i < UCP_MAX_RAILS; i++) { + state->dt.mrail[i].memh = UCT_MEM_HANDLE_NULL; + state->dt.mrail[i].lane = UCP_NULL_LANE; + } +} + +static UCS_F_ALWAYS_INLINE int +ucp_request_is_empty_rail(ucp_dt_state_t *state, int rail) { + return state->dt.mrail[rail].memh == UCT_MEM_HANDLE_NULL || + state->dt.mrail[rail].lane == UCP_NULL_LANE; +} + +static UCS_F_ALWAYS_INLINE int +ucp_request_have_rails(ucp_dt_state_t *state) { + return !ucp_request_is_empty_rail(state, 0); +} + +static inline int ucp_request_mrail_reg(ucp_request_t *req) +{ + ucp_ep_t *ep = req->send.ep; + ucp_dt_state_t *state = &req->send.state; + int cnt = 0; + ucs_status_t status; + int i; + ucp_lane_index_t lane; + + ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype)); + + ucp_request_clear_rails(state); + + for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) { + lane = ucp_ep_get_rndv_get_lane(ep, i); + + if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) { + status = uct_md_mem_reg(ucp_ep_md(ep, lane), + (void *)req->send.buffer, req->send.length, + UCT_MD_MEM_ACCESS_RMA, &state->dt.mrail[cnt].memh); + ucs_assert_always(status == UCS_OK); + state->dt.mrail[cnt].lane = lane; + cnt++; + } + } + + return cnt; +} + +static inline void ucp_request_mrail_dereg(ucp_request_t *req) +{ + ucp_dt_state_t *state = &req->send.state; + ucs_status_t status; + int i; + + for (i = 0; i < UCP_MAX_RAILS && !ucp_request_is_empty_rail(&req->send.state, i); i++) { + status = uct_md_mem_dereg(ucp_ep_md(req->send.ep, state->dt.mrail[i].lane), + state->dt.mrail[i].memh); + ucs_assert_always(status == UCS_OK); + } + + ucp_request_clear_rails(state); +} + + diff --git a/src/ucp/core/ucp_types.h b/src/ucp/core/ucp_types.h index ff96f889525c..9a9a56d01bc3 100644 --- a/src/ucp/core/ucp_types.h +++ b/src/ucp/core/ucp_types.h @@ -29,7 +29,8 @@ typedef ucp_rsc_index_t ucp_md_index_t; UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t; /* Lanes */ -#define UCP_MAX_LANES 8 +#define UCP_MAX_LANES 16 +#define UCP_MAX_RAILS 8 #define UCP_NULL_LANE ((ucp_lane_index_t)-1) typedef uint8_t ucp_lane_index_t; UCP_UINT_TYPE(UCP_MAX_LANES) ucp_lane_map_t; diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index e8e2ad34107c..96f8e48093b1 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -356,7 +356,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) ucp_ep_config_key_t key = ucp_ep_config(ucp_ep)->key; key.am_lane = 0; key.wireup_lane = 0; - key.rndv_lane = 0; + key.rndv_lanes[0] = 0; key.tag_lane = 0; key.amo_lanes[0] = 0; key.rma_lanes[0] = 0; diff --git a/src/ucp/dt/dt.h b/src/ucp/dt/dt.h index f35b03f75b3d..4404b2d670b9 100644 --- a/src/ucp/dt/dt.h +++ b/src/ucp/dt/dt.h @@ -11,6 +11,7 @@ #include "dt_contig.h" #include "dt_iov.h" #include "dt_generic.h" +#include "ucp/core/ucp_types.h" #include #include @@ -24,8 +25,14 @@ typedef struct ucp_dt_state { size_t offset; /* Total offset in overall payload. */ union { struct { - uct_mem_h memh; - } contig; + struct { + uct_mem_h memh; + } contig; + struct { + ucp_lane_index_t lane; + uct_mem_h memh; + } mrail[UCP_MAX_RAILS]; + }; struct { size_t iov_offset; /* Offset in the IOV item */ size_t iovcnt_offset; /* The IOV item to start copy */ diff --git a/src/ucp/rma/basic_rma.c b/src/ucp/rma/basic_rma.c index 722bad0c31f4..df5e068be369 100644 --- a/src/ucp/rma/basic_rma.c +++ b/src/ucp/rma/basic_rma.c @@ -107,6 +107,7 @@ ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer, if (length < zcopy_thresh) { req->send.uct_comp.func = ucp_rma_request_bcopy_completion; req->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; + ucp_request_clear_rails(&req->send.state); return UCS_OK; } else { req->send.uct_comp.func = ucp_rma_request_zcopy_completion; diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 548d3c4c10a8..08fa638478a2 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -37,9 +37,10 @@ static void ucp_rndv_rma_request_send_buffer_dereg(ucp_request_t *sreq) * (state->dt.contig.memh != UCT_MEM_HANDLE_NULL) */ if (UCP_DT_IS_CONTIG(sreq->send.datatype) && - ucp_ep_is_rndv_lane_present(sreq->send.ep) && + ucp_ep_is_rndv_lane_present(sreq->send.ep, 0) && ucp_request_is_send_buffer_reg(sreq)) { - ucp_request_send_buffer_dereg(sreq, ucp_ep_get_rndv_get_lane(sreq->send.ep)); + ucp_request_mrail_dereg(sreq); + ucp_request_send_buffer_dereg(sreq, ucp_ep_get_rndv_get_lane(sreq->send.ep, 0)); } } @@ -53,7 +54,7 @@ size_t ucp_tag_rndv_pack_rkey(ucp_request_t *sreq, ucp_lane_index_t lane, /* Check if the sender needs to register the send buffer - * is its datatype contiguous and does the receive side need it */ - if (ucp_ep_rndv_md_flags(ep) & UCT_MD_FLAG_NEED_RKEY) { + if (ucp_ep_rndv_md_flags(ep, 0) & UCT_MD_FLAG_NEED_RKEY) { status = ucp_request_send_buffer_reg(sreq, lane); ucs_assert_always(status == UCS_OK); @@ -67,6 +68,57 @@ size_t ucp_tag_rndv_pack_rkey(ucp_request_t *sreq, ucp_lane_index_t lane, return 0; } +static size_t ucp_tag_rndv_pack_mrail_rkeys(ucp_request_t *sreq, void *rkey_buf, uint16_t *flags) +{ + ucp_ep_t *ep = sreq->send.ep; + ucp_dt_state_t *state = &sreq->send.state; + size_t packet = 0; + int i; + int cnt = 0; + ucp_lane_index_t lane; + + ucs_assert(UCP_DT_IS_CONTIG(sreq->send.datatype)); + + cnt = ucp_request_mrail_reg(sreq); + ucs_assert_always(cnt <= UCP_MAX_RAILS); + + if (cnt) { + for (i = 0; i < cnt && i < UCP_MAX_RAILS; i++) { + ucs_assert_always(!ucp_request_is_empty_rail(state, i)); + + lane = state->dt.mrail[i].lane; + + UCS_PROFILE_CALL(uct_md_mkey_pack, ucp_ep_md(ep, lane), + state->dt.mrail[i].memh, rkey_buf + packet); + packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size; + } + *flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY | UCP_RNDV_RTS_FLAG_PACKED_MRAIL_RKEY; + } + + return packet; +} + +static void ucp_tag_rndv_unpack_mrail_rkeys(ucp_request_t *req, void *rkey_buf) +{ + ucp_ep_t *ep = req->send.ep; + size_t packet = 0; + int i; + ucp_lane_index_t lane; + + ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype)); + ucs_assert(ucp_ep_is_rndv_mrail_present(ep)); + + for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) { + lane = ucp_ep_get_rndv_get_lane(ep, i); + if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) { + UCS_PROFILE_CALL(uct_rkey_unpack, rkey_buf + packet, + &req->send.rndv_get.mrail[i].rkey_bundle); + req->send.rndv_get.mrail[i].lane = lane; + packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size; + } + } +} + static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) { ucp_request_t *sreq = arg; /* the sender's request */ @@ -82,8 +134,12 @@ static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) rndv_rts_hdr->size = sreq->send.length; if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { rndv_rts_hdr->address = (uintptr_t) sreq->send.buffer; - if (ucp_ep_is_rndv_lane_present(ep)) { - packed_len += ucp_tag_rndv_pack_rkey(sreq, ucp_ep_get_rndv_get_lane(ep), + if (ucp_ep_is_rndv_mrail_present(ep)) { + packed_len += ucp_tag_rndv_pack_mrail_rkeys(sreq, + rndv_rts_hdr + 1, + &rndv_rts_hdr->flags); + } else if (ucp_ep_is_rndv_lane_present(ep, 0)) { + packed_len += ucp_tag_rndv_pack_rkey(sreq, ucp_ep_get_rndv_get_lane(ep, 0), rndv_rts_hdr + 1, &rndv_rts_hdr->flags); } @@ -145,6 +201,7 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq) if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; + ucp_request_clear_rails(&sreq->send.state); } if (sreq->send.ep->flags & UCP_EP_FLAG_TAG_OFFLOAD_ENABLED) { status = ucp_tag_offload_start_rndv(sreq); @@ -241,13 +298,17 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), size_t offset, length, ucp_mtu, align; uct_iov_t iov[1]; ucp_rsc_index_t rsc_index; + ucp_lane_index_t lane; + uct_rkey_t rkey; + unsigned rail_idx; if (ucp_ep_is_stub(rndv_req->send.ep)) { return UCS_ERR_NO_RESOURCE; } if (!(ucp_tag_rndv_is_get_op_possible(rndv_req->send.ep, rndv_req->send.lane, - rndv_req->send.rndv_get.rkey_bundle.rkey))) { + rndv_req->send.rndv_get.rkey_bundle.rkey)) && + !(rndv_req->send.rndv_get.use_mrail)) { /* can't perform get_zcopy - switch to AM rndv */ if (rndv_req->send.rndv_get.rkey_bundle.rkey != UCT_INVALID_RKEY) { uct_rkey_release(&rndv_req->send.rndv_get.rkey_bundle); @@ -267,11 +328,16 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), rndv_req->send.ep, rndv_req, rndv_req->send.lane); /* rndv_req is the internal request to perform the get operation */ - if (rndv_req->send.state.dt.contig.memh == UCT_MEM_HANDLE_NULL) { + if (!rndv_req->send.rndv_get.use_mrail && + (rndv_req->send.state.dt.contig.memh == UCT_MEM_HANDLE_NULL)) { /* TODO Not all UCTs need registration on the recv side */ UCS_PROFILE_REQUEST_EVENT(rndv_req->send.rndv_get.rreq, "rndv_recv_reg", 0); status = ucp_request_send_buffer_reg(rndv_req, rndv_req->send.lane); ucs_assert_always(status == UCS_OK); + } else if(rndv_req->send.rndv_get.use_mrail && + ucp_request_is_empty_rail(&rndv_req->send.state, 0)) { + ucp_request_mrail_reg(rndv_req); + rndv_req->send.rndv_get.rail_idx = 0; } offset = rndv_req->send.state.offset; @@ -288,16 +354,33 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), offset, (uintptr_t)rndv_req->send.buffer % align, (void*)rndv_req->send.buffer + offset, length); + if ((rndv_req->send.rndv_get.use_mrail) && + ((ucp_request_is_empty_rail(&rndv_req->send.state, rndv_req->send.rndv_get.rail_idx)) || + (rndv_req->send.rndv_get.rail_idx >= UCP_MAX_RAILS))) { + rndv_req->send.rndv_get.rail_idx = 0; + } + iov[0].buffer = (void*)rndv_req->send.buffer + offset; iov[0].length = length; - iov[0].memh = rndv_req->send.state.dt.contig.memh; iov[0].count = 1; iov[0].stride = 0; + if (!rndv_req->send.rndv_get.use_mrail) { + iov[0].memh = rndv_req->send.state.dt.contig.memh; + lane = rndv_req->send.lane; + rkey = rndv_req->send.rndv_get.rkey_bundle.rkey; + } else { + rail_idx = rndv_req->send.rndv_get.rail_idx; + iov[0].memh = rndv_req->send.state.dt.mrail[rail_idx].memh; + lane = rndv_req->send.rndv_get.mrail[rail_idx].lane; + rkey = rndv_req->send.rndv_get.mrail[rail_idx].rkey_bundle.rkey; + rndv_req->send.rndv_get.rail_idx++; + } + rndv_req->send.uct_comp.count++; - status = uct_ep_get_zcopy(rndv_req->send.ep->uct_eps[rndv_req->send.lane], + status = uct_ep_get_zcopy(rndv_req->send.ep->uct_eps[lane], iov, 1, rndv_req->send.rndv_get.remote_address + offset, - rndv_req->send.rndv_get.rkey_bundle.rkey, + rkey, &rndv_req->send.uct_comp); if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { @@ -350,13 +433,17 @@ static void ucp_rndv_handle_recv_contig(ucp_request_t *rndv_req, ucp_request_t * rndv_req->send.proto.remote_request = rndv_rts_hdr->sreq.reqptr; rndv_req->send.proto.rreq_ptr = (uintptr_t) rreq; } else { - if (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_PACKED_RKEY) { + rndv_req->send.rndv_get.use_mrail = 0; + if (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_PACKED_MRAIL_RKEY) { + ucp_tag_rndv_unpack_mrail_rkeys(rndv_req, rndv_rts_hdr + 1); + rndv_req->send.rndv_get.use_mrail = 1; + } else if (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_PACKED_RKEY) { UCS_PROFILE_CALL(uct_rkey_unpack, rndv_rts_hdr + 1, &rndv_req->send.rndv_get.rkey_bundle); } /* rndv_req is the request that would perform the get operation */ - rndv_req->send.uct.func = ucp_proto_progress_rndv_get_zcopy; - rndv_req->send.buffer = rreq->recv.buffer; + rndv_req->send.uct.func = ucp_proto_progress_rndv_get_zcopy; + rndv_req->send.buffer = rreq->recv.buffer; rndv_req->send.length = rndv_rts_hdr->size; rndv_req->send.uct_comp.func = ucp_rndv_get_completion; rndv_req->send.uct_comp.count = 0; @@ -364,9 +451,10 @@ static void ucp_rndv_handle_recv_contig(ucp_request_t *rndv_req, ucp_request_t * if (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD) { rndv_req->send.lane = ucp_ep_get_tag_lane(rndv_req->send.ep); } else { - rndv_req->send.lane = ucp_ep_get_rndv_get_lane(rndv_req->send.ep); + rndv_req->send.lane = ucp_ep_get_rndv_get_lane(rndv_req->send.ep, 0); } rndv_req->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; + ucp_request_clear_rails(&rndv_req->send.state); rndv_req->send.rndv_get.remote_request = rndv_rts_hdr->sreq.reqptr; rndv_req->send.rndv_get.remote_address = rndv_rts_hdr->address; rndv_req->send.rndv_get.rreq = rreq; @@ -414,13 +502,13 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), /* if the receive side is not connected yet then the RTS was received on a stub ep */ if (ucp_ep_is_stub(ep)) { ucs_debug("received rts on a stub ep, ep=%p, rndv_lane=%d, " - "am_lane=%d", ep, ucp_ep_is_rndv_lane_present(ep) ? - ucp_ep_get_rndv_get_lane(ep): UCP_NULL_LANE, + "am_lane=%d", ep, ucp_ep_is_rndv_lane_present(ep, 0) ? + ucp_ep_get_rndv_get_lane(ep, 0): UCP_NULL_LANE, ucp_ep_get_am_lane(ep)); } if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) { - if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep) || + if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep, 0) || (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD))) { /* read the data from the sender with a get_zcopy operation on the * rndv lane */ @@ -637,11 +725,17 @@ static void ucp_rndv_prepare_zcopy_send_buffer(ucp_request_t *sreq, ucp_ep_h ep) (ucp_ep_get_am_lane(ep) != ucp_ep_get_tag_lane(ep))) { ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep)); sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; - } else if ((ucp_ep_is_rndv_lane_present(ep)) && - (ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep))) { + ucp_request_clear_rails(&sreq->send.state); + } else if ((ucp_ep_is_rndv_lane_present(ep, 0)) && + (ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep, 0))) { /* dereg the original send request since we are going to send on the AM lane next */ ucp_rndv_rma_request_send_buffer_dereg(sreq); sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; + ucp_request_clear_rails(&sreq->send.state); + } else if (ucp_request_have_rails(&sreq->send.state)) { + ucp_rndv_rma_request_send_buffer_dereg(sreq); + sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; + ucp_request_clear_rails(&sreq->send.state); } if (sreq->send.state.dt.contig.memh == UCT_MEM_HANDLE_NULL) { /* register the send buffer for the zcopy operation */ diff --git a/src/ucp/tag/rndv.h b/src/ucp/tag/rndv.h index 660cfbad4eab..97ca410d5ee3 100644 --- a/src/ucp/tag/rndv.h +++ b/src/ucp/tag/rndv.h @@ -14,8 +14,9 @@ #include enum { - UCP_RNDV_RTS_FLAG_PACKED_RKEY = UCS_BIT(0), - UCP_RNDV_RTS_FLAG_OFFLOAD = UCS_BIT(1) + UCP_RNDV_RTS_FLAG_PACKED_RKEY = UCS_BIT(0), + UCP_RNDV_RTS_FLAG_OFFLOAD = UCS_BIT(1), + UCP_RNDV_RTS_FLAG_PACKED_MRAIL_RKEY = UCS_BIT(2) /* multirail key packed */ }; /* diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 1f6ae4457b11..723b3e1c978c 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -923,8 +923,9 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, key->am_lane = lane; } if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RNDV) { - ucs_assert(key->rndv_lane == UCP_NULL_LANE); - key->rndv_lane = lane; + /* TODO: add rndv sort */ + ucs_assert(key->rndv_lanes[0] == UCP_NULL_LANE); + key->rndv_lanes[0] = lane; } if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RMA) { key->rma_lanes[lane] = lane;