From 93d6eb9016d04651a7afd0e3c8e12be92d2e2da9 Mon Sep 17 00:00:00 2001 From: Sergey Oblomov Date: Wed, 8 Nov 2017 10:39:15 +0200 Subject: [PATCH] UCX/RNDV: multirail - updated EP configuration - added array of lanes to be used by rndv-mrail - added array of memory handles for multirail --- src/ucp/core/ucp_context.c | 4 + src/ucp/core/ucp_context.h | 2 + src/ucp/core/ucp_ep.c | 50 ++++++------ src/ucp/core/ucp_ep.h | 10 ++- src/ucp/core/ucp_ep.inl | 23 +++--- src/ucp/core/ucp_request.c | 8 +- src/ucp/core/ucp_request.h | 13 +-- src/ucp/core/ucp_request.inl | 9 ++- src/ucp/core/ucp_types.h | 1 + src/ucp/core/ucp_worker.c | 3 +- src/ucp/dt/dt.h | 26 +++++- src/ucp/proto/proto_am.inl | 2 +- src/ucp/rma/basic_rma.c | 6 +- src/ucp/tag/offload.c | 2 +- src/ucp/tag/rndv.c | 152 +++++++++++++++++++++++++---------- src/ucp/wireup/select.c | 5 +- 16 files changed, 219 insertions(+), 97 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 6b98889f4ed9..564e732ff723 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -100,6 +100,10 @@ static ucs_config_field_t ucp_config_table[] = { "the eager_zcopy protocol", ucs_offsetof(ucp_config_t, ctx.rndv_perf_diff), UCS_CONFIG_TYPE_DOUBLE}, + {"MAX_RNDV_LANES", "1", + "Set max multirail-get rendezvous lane numbers", + ucs_offsetof(ucp_config_t, ctx.max_rndv_lanes), UCS_CONFIG_TYPE_UINT}, + {"ZCOPY_THRESH", "auto", "Threshold for switching from buffer copy to zero copy protocol", ucs_offsetof(ucp_config_t, ctx.zcopy_thresh), UCS_CONFIG_TYPE_MEMUNITS}, diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index ac7acc121cea..58362cacded0 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -53,6 +53,8 @@ typedef struct ucp_context_config { int use_mt_mutex; /** On-demand progress */ int adaptive_progress; + /** Rendezvous multirail support */ + unsigned max_rndv_lanes; } ucp_context_config_t; diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 067706a0b844..402d324a8ce5 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -38,15 +38,16 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key) { memset(key, 0, sizeof(*key)); key->num_lanes = 0; + key->num_rndv_lanes = 0; key->am_lane = UCP_NULL_LANE; - key->rndv_lane = UCP_NULL_LANE; key->wireup_lane = UCP_NULL_LANE; key->tag_lane = UCP_NULL_LANE; key->reachable_md_map = 0; key->err_mode = UCP_ERR_HANDLING_MODE_NONE; key->status = UCS_OK; - memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes)); - memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes)); + memset(key->rndv_lanes, UCP_NULL_LANE, sizeof(key->rndv_lanes)); + memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes)); + memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes)); } ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid, @@ -150,7 +151,7 @@ ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid, key.lanes[0].rsc_index = UCP_NULL_RESOURCE; key.lanes[0].dst_md_index = UCP_NULL_RESOURCE; key.am_lane = 0; - key.rndv_lane = 0; + key.rndv_lanes[0] = 0; key.wireup_lane = 0; ep->cfg_index = ucp_worker_get_ep_config(worker, &key); @@ -481,16 +482,17 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, ucp_lane_index_t lane; - if ((key1->num_lanes != key2->num_lanes) || - memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) || - memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) || - (key1->reachable_md_map != key2->reachable_md_map) || - (key1->am_lane != key2->am_lane) || - (key1->rndv_lane != key2->rndv_lane) || - (key1->tag_lane != key2->tag_lane) || - (key1->wireup_lane != key2->wireup_lane) || - (key1->err_mode != key2->err_mode) || - (key1->status != key2->status)) + if ((key1->num_lanes != key2->num_lanes) || + (key1->num_rndv_lanes != key2->num_rndv_lanes) || + memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) || + memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) || + memcmp(key1->rndv_lanes, key2->rndv_lanes, sizeof(key1->rndv_lanes)) || + (key1->reachable_md_map != key2->reachable_md_map) || + (key1->am_lane != key2->am_lane) || + (key1->tag_lane != key2->tag_lane) || + (key1->wireup_lane != key2->wireup_lane) || + (key1->err_mode != key2->err_mode) || + (key1->status != key2->status)) { return 0; } @@ -779,7 +781,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) if (!ucp_ep_is_tag_offload_enabled(config)) { /* Tag offload is disabled, AM will be used for all * tag-matching protocols */ - ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane, + /* TODO: set threshold level based on all available lanes */ + ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lanes[0], UCT_IFACE_FLAG_GET_ZCOPY, max_rndv_thresh); config->tag.eager = config->am; @@ -793,7 +796,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) /* Configuration for remote memory access */ for (lane = 0; lane < config->key.num_lanes; ++lane) { - if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) { + if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) { continue; } @@ -897,8 +900,8 @@ static void ucp_ep_config_print_rma_proto(FILE *stream, const char *name, fprintf(stream, "..(inf)\n"); } -int ucp_ep_config_get_rma_prio(const ucp_lane_index_t *lanes, - ucp_lane_index_t lane) +int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes, + ucp_lane_index_t lane) { int prio; for (prio = 0; prio < UCP_MAX_LANES; ++prio) { @@ -956,13 +959,13 @@ void ucp_ep_config_lane_info_str(ucp_context_h context, snprintf(p, endp - p, "md[%d]", key->lanes[lane].dst_md_index); p += strlen(p); - prio = ucp_ep_config_get_rma_prio(key->rma_lanes, lane); + prio = ucp_ep_config_get_multi_lane_prio(key->rma_lanes, lane); if (prio != -1) { snprintf(p, endp - p, " rma#%d", prio); p += strlen(p); } - prio = ucp_ep_config_get_rma_prio(key->amo_lanes, lane); + prio = ucp_ep_config_get_multi_lane_prio(key->amo_lanes, lane); if (prio != -1) { snprintf(p, endp - p, " amo#%d", prio); p += strlen(p); @@ -973,8 +976,9 @@ void ucp_ep_config_lane_info_str(ucp_context_h context, p += strlen(p); } - if (lane == key->rndv_lane) { - snprintf(p, endp - p, " zcopy_rndv"); + prio = ucp_ep_config_get_multi_lane_prio(key->rndv_lanes, lane); + if (prio != -1) { + snprintf(p, endp - p, " zcopy_rndv#%d", prio); p += strlen(p); } @@ -1028,7 +1032,7 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker, if (context->config.features & UCP_FEATURE_RMA) { for (lane = 0; lane < config->key.num_lanes; ++lane) { - if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) { + if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) { continue; } ucp_ep_config_print_rma_proto(stream, "put", lane, diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index f18bc24b4121..0e378a0b3ea3 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -62,6 +62,8 @@ typedef struct ucp_ep_config_key { ucp_lane_index_t num_lanes; /* Number of active lanes */ + ucp_lane_index_t num_rndv_lanes; /* Number of rendezvous lanes */ + struct { ucp_rsc_index_t rsc_index; /* Resource index */ ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy @@ -71,10 +73,12 @@ typedef struct ucp_ep_config_key { } lanes[UCP_MAX_LANES]; ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ - ucp_lane_index_t rndv_lane; /* Lane for zcopy Rendezvous (can be NULL) */ ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ + /* Lane for zcopy rendezvous (can be NULL) */ + ucp_lane_index_t rndv_lanes[UCP_MAX_LANES]; + /* Lanes for remote memory access, sorted by priority, highest first */ ucp_lane_index_t rma_lanes[UCP_MAX_LANES]; @@ -242,8 +246,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); -int ucp_ep_config_get_rma_prio(const ucp_lane_index_t *lanes, - ucp_lane_index_t lane); +int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes, + ucp_lane_index_t lane); size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt, const uct_linear_growth_t *reg_cost, diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index 3e347649b9f3..3b5d58e12f9c 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -32,10 +32,10 @@ static inline ucp_lane_index_t ucp_ep_get_wireup_msg_lane(ucp_ep_h ep) return (lane == UCP_NULL_LANE) ? ucp_ep_get_am_lane(ep) : lane; } -static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep) +static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep, ucp_lane_index_t idx) { - ucs_assert(ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE); - return ucp_ep_config(ep)->key.rndv_lane; + ucs_assert(ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE); + return ucp_ep_config(ep)->key.rndv_lanes[idx]; } static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep) @@ -44,9 +44,14 @@ static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep) return ucp_ep_config(ep)->key.tag_lane; } -static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep) +static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep, ucp_lane_index_t idx) { - return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE; + return ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE; +} + +static inline int ucp_ep_rndv_num_lanes(ucp_ep_h ep) +{ + return ucp_ep_config(ep)->key.num_rndv_lanes; } static inline int ucp_ep_is_tag_offload_enabled(ucp_ep_config_t *config) @@ -65,9 +70,9 @@ static inline uct_ep_h ucp_ep_get_am_uct_ep(ucp_ep_h ep) return ep->uct_eps[ucp_ep_get_am_lane(ep)]; } -static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep) +static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep, ucp_lane_index_t idx) { - return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep)]; + return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep, idx)]; } static inline uct_ep_h ucp_ep_get_tag_uct_ep(ucp_ep_h ep) @@ -120,9 +125,9 @@ static inline const uct_md_attr_t* ucp_ep_md_attr(ucp_ep_h ep, ucp_lane_index_t return &context->tl_mds[ucp_ep_md_index(ep, lane)].attr; } -static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep) +static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep, ucp_lane_index_t idx) { - return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep))->cap.flags; + return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep, idx))->cap.flags; } static inline const char* ucp_ep_peer_name(ucp_ep_h ep) diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index be5b3aa322be..885e17ca8b1f 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -191,7 +191,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg, switch (datatype & UCP_DATATYPE_CLASS_MASK) { case UCP_DATATYPE_CONTIG: status = uct_md_mem_reg(uct_md, buffer, length, UCT_MD_MEM_ACCESS_RMA, - &state->dt.contig.memh); + &state->dt.contig[0].memh); break; case UCP_DATATYPE_IOV: iovcnt = state->dt.iov.iovcnt; @@ -245,8 +245,9 @@ UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg, switch (datatype & UCP_DATATYPE_CLASS_MASK) { case UCP_DATATYPE_CONTIG: - if (state->dt.contig.memh != UCT_MEM_HANDLE_NULL) { - uct_md_mem_dereg(uct_md, state->dt.contig.memh); + if (state->dt.contig[0].memh != UCT_MEM_HANDLE_NULL) { + uct_md_mem_dereg(uct_md, state->dt.contig[0].memh); + state->dt.contig[0].memh = UCT_MEM_HANDLE_NULL; } break; case UCP_DATATYPE_IOV: @@ -344,3 +345,4 @@ ucp_request_send_start(ucp_request_t *req, ssize_t max_short, return UCS_ERR_NO_PROGRESS; } + diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index e490ac96c6c4..cdbd37663063 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -117,10 +117,10 @@ struct ucp_request { } proxy; struct { - uint64_t remote_address; /* address of the sender's data buffer */ - uintptr_t remote_request; /* pointer to the sender's send request */ - uct_rkey_bundle_t rkey_bundle; - ucp_request_t *rreq; /* receive request on the recv side */ + uint64_t remote_address; /* address of the sender's data buffer */ + uintptr_t remote_request; /* pointer to the sender's send request */ + uct_rkey_bundle_t rkey_bundle; + ucp_request_t *rreq; /* receive request on the recv side */ } rndv_get; struct { @@ -206,6 +206,7 @@ typedef struct ucp_recv_desc { extern ucs_mpool_ops_t ucp_request_mpool_ops; +extern ucs_mpool_ops_t ucp_rndv_get_mpool_ops; int ucp_request_pending_add(ucp_request_t *req, ucs_status_t *req_status); @@ -215,8 +216,8 @@ ucs_status_t ucp_request_send_buffer_reg(ucp_request_t *req, ucp_lane_index_t la void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane); ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index, - void *buffer, size_t length, - ucp_datatype_t datatype, ucp_dt_state_t *state); + void *buffer, size_t length, ucp_datatype_t datatype, + ucp_dt_state_t *state); void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index, ucp_datatype_t datatype, ucp_dt_state_t *state); diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index 65dfd5e20ba6..92610df113e9 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -206,7 +206,7 @@ ucp_request_send_state_reset(ucp_request_t *req, /* Fall through */ case UCP_REQUEST_SEND_PROTO_RNDV_GET: if (UCP_DT_IS_CONTIG(req->send.datatype)) { - req->send.state.dt.dt.contig.memh = UCT_MEM_HANDLE_NULL; + ucp_dt_clear_rndv_lanes(&req->send.state.dt); } /* Fall through */ case UCP_REQUEST_SEND_PROTO_ZCOPY_AM: @@ -318,3 +318,10 @@ static UCS_F_ALWAYS_INLINE void ucp_request_send_tag_stat(ucp_request_t *req) UCP_EP_STAT_TAG_OP(req->send.ep, EAGER); } } + +static UCS_F_ALWAYS_INLINE +uct_rkey_bundle_t *ucp_tag_rndv_rkey(ucp_request_t *req) +{ + return &req->send.rndv_get.rkey_bundle; +} + diff --git a/src/ucp/core/ucp_types.h b/src/ucp/core/ucp_types.h index fd66fd96dd46..d10e91e55d1c 100644 --- a/src/ucp/core/ucp_types.h +++ b/src/ucp/core/ucp_types.h @@ -30,6 +30,7 @@ UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t; /* Lanes */ #define UCP_MAX_LANES 8 +#define UCP_MAX_RNDV_LANES 4 #define UCP_NULL_LANE ((ucp_lane_index_t)-1) typedef uint8_t ucp_lane_index_t; UCP_UINT_TYPE(UCP_MAX_LANES) ucp_lane_map_t; diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 9f025eecac3c..b7c77c6b80a1 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -374,7 +374,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) ucp_ep_config_key_t key = ucp_ep_config(ucp_ep)->key; key.am_lane = 0; key.wireup_lane = 0; - key.rndv_lane = 0; + key.rndv_lanes[0] = 0; key.tag_lane = 0; key.amo_lanes[0] = 0; key.rma_lanes[0] = 0; @@ -1166,7 +1166,6 @@ void ucp_worker_destroy(ucp_worker_h worker) ucp_worker_remove_am_handlers(worker); ucp_worker_destroy_eps(worker); ucs_mpool_cleanup(&worker->am_mp, 1); - ucs_mpool_cleanup(&worker->reg_mp, 1); ucp_worker_close_ifaces(worker); ucp_worker_wakeup_cleanup(worker); ucs_mpool_cleanup(&worker->req_mp, 1); diff --git a/src/ucp/dt/dt.h b/src/ucp/dt/dt.h index f35b03f75b3d..816af2abe73d 100644 --- a/src/ucp/dt/dt.h +++ b/src/ucp/dt/dt.h @@ -12,6 +12,7 @@ #include "dt_iov.h" #include "dt_generic.h" +#include #include #include #include @@ -24,8 +25,8 @@ typedef struct ucp_dt_state { size_t offset; /* Total offset in overall payload. */ union { struct { - uct_mem_h memh; - } contig; + uct_mem_h memh; + } contig[UCP_MAX_RNDV_LANES]; struct { size_t iov_offset; /* Offset in the IOV item */ size_t iovcnt_offset; /* The IOV item to start copy */ @@ -119,4 +120,25 @@ ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size, } } +static UCS_F_ALWAYS_INLINE void +ucp_dt_clear_rndv_lanes(ucp_dt_state_t *state) +{ + int i; + for (i = 0; i < UCP_MAX_RNDV_LANES; i++) { + state->dt.contig[i].memh = UCT_MEM_HANDLE_NULL; + } +} + +static UCS_F_ALWAYS_INLINE int +ucp_dt_is_empty_rndv_lane(ucp_dt_state_t *state, int idx) +{ + return state->dt.contig[idx].memh == UCT_MEM_HANDLE_NULL; +} + +static UCS_F_ALWAYS_INLINE int +ucp_dt_have_rndv_lanes(ucp_dt_state_t *state) +{ + return !ucp_dt_is_empty_rndv_lane(state, 0); +} + #endif diff --git a/src/ucp/proto/proto_am.inl b/src/ucp/proto/proto_am.inl index 925781165878..10d071401e81 100644 --- a/src/ucp/proto/proto_am.inl +++ b/src/ucp/proto/proto_am.inl @@ -88,7 +88,7 @@ void ucp_dt_iov_copy_uct(uct_iov_t *iov, size_t *iovcnt, size_t max_dst_iov, case UCP_DATATYPE_CONTIG: iov[0].buffer = (void *)src_iov + state->offset; iov[0].length = length_max; - iov[0].memh = state->dt.contig.memh; + iov[0].memh = state->dt.contig[0].memh; iov[0].stride = 0; iov[0].count = 1; diff --git a/src/ucp/rma/basic_rma.c b/src/ucp/rma/basic_rma.c index e9b55c195675..ad012a3cdb1a 100644 --- a/src/ucp/rma/basic_rma.c +++ b/src/ucp/rma/basic_rma.c @@ -47,7 +47,7 @@ ucp_rma_request_advance(ucp_request_t *req, ssize_t frag_length, if (req->send.length == 0) { /* bcopy is the fast path */ if (ucs_likely(req->send.state.uct_comp.count == 0)) { - if (ucs_unlikely(req->send.state.dt.dt.contig.memh != + if (ucs_unlikely(req->send.state.dt.dt.contig[0].memh != UCT_MEM_HANDLE_NULL)) { ucp_request_send_buffer_dereg(req, req->send.lane); } @@ -156,7 +156,7 @@ static ucs_status_t ucp_progress_put(uct_pending_req_t *self) iov.buffer = (void *)req->send.buffer; iov.length = packed_len; iov.count = 1; - iov.memh = req->send.state.dt.dt.contig.memh; + iov.memh = req->send.state.dt.dt.contig[0].memh; status = UCS_PROFILE_CALL(uct_ep_put_zcopy, ep->uct_eps[lane], @@ -200,7 +200,7 @@ static ucs_status_t ucp_progress_get(uct_pending_req_t *self) iov.buffer = (void *)req->send.buffer; iov.length = frag_length; iov.count = 1; - iov.memh = req->send.state.dt.dt.contig.memh; + iov.memh = req->send.state.dt.dt.contig[0].memh; status = UCS_PROFILE_CALL(uct_ep_get_zcopy, ep->uct_eps[lane], diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index f351c1066a6a..152568b90054 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -270,7 +270,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) req->recv.rdesc = NULL; iov.buffer = (void*)req->recv.buffer; - iov.memh = req->recv.state.dt.contig.memh; + iov.memh = req->recv.state.dt.contig[0].memh; } else { rdesc = ucp_worker_mpool_get(worker); if (rdesc == NULL) { diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 9636aa1e6647..772635fa5a65 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -13,19 +13,27 @@ #include -static int ucp_tag_rndv_is_get_op_possible(ucp_ep_h ep, ucp_lane_index_t lane, - uct_rkey_t rkey) +static int ucp_tag_rndv_is_get_op_possible(ucp_ep_h ep, ucp_request_t *req) { uint64_t md_flags; + int i; + uct_rkey_t rkey; ucs_assert(!ucp_ep_is_stub(ep)); - if (lane != UCP_NULL_LANE) { - md_flags = ucp_ep_md_attr(ep, lane)->cap.flags; - return (((rkey != UCT_INVALID_RKEY) && (md_flags & UCT_MD_FLAG_REG)) || - !(md_flags & UCT_MD_FLAG_NEED_RKEY)); - } else { + + if (!ucp_ep_rndv_num_lanes(ep)) { return 0; } + + for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) { + md_flags = ucp_ep_rndv_md_flags(ep, i); + rkey = ucp_tag_rndv_rkey(req)->rkey; + if ((md_flags & UCT_MD_FLAG_NEED_RKEY) && (rkey == UCT_INVALID_RKEY)) { + return 0; + } + } + + return 1; } static void ucp_rndv_rma_request_send_buffer_dereg(ucp_request_t *sreq) @@ -36,9 +44,9 @@ static void ucp_rndv_rma_request_send_buffer_dereg(ucp_request_t *sreq) * (state->dt.contig.memh != UCT_MEM_HANDLE_NULL) */ if (UCP_DT_IS_CONTIG(sreq->send.datatype) && - ucp_ep_is_rndv_lane_present(sreq->send.ep) && + ucp_ep_is_rndv_lane_present(sreq->send.ep, 0) && ucp_request_is_send_buffer_reg(sreq)) { - ucp_request_send_buffer_dereg(sreq, ucp_ep_get_rndv_get_lane(sreq->send.ep)); + ucp_request_send_buffer_dereg(sreq, ucp_ep_get_rndv_get_lane(sreq->send.ep, 0)); } } @@ -52,13 +60,13 @@ size_t ucp_tag_rndv_pack_rkey(ucp_request_t *sreq, ucp_lane_index_t lane, /* Check if the sender needs to register the send buffer - * is its datatype contiguous and does the receive side need it */ - if (ucp_ep_rndv_md_flags(ep) & UCT_MD_FLAG_NEED_RKEY) { + if (ucp_ep_rndv_md_flags(ep, 0) & UCT_MD_FLAG_NEED_RKEY) { status = ucp_request_send_buffer_reg(sreq, lane); ucs_assert_always(status == UCS_OK); /* if the send buffer was registered, send the rkey */ UCS_PROFILE_CALL(uct_md_mkey_pack, ucp_ep_md(ep, lane), - sreq->send.state.dt.dt.contig.memh, rkey_buf); + sreq->send.state.dt.dt.contig[0].memh, rkey_buf); *flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY; return ucp_ep_md_attr(ep, lane)->rkey_packed_size; } @@ -66,6 +74,63 @@ size_t ucp_tag_rndv_pack_rkey(ucp_request_t *sreq, ucp_lane_index_t lane, return 0; } +static size_t ucp_tag_rndv_pack_rkeys(ucp_request_t *sreq, void *rkey_buf, uint16_t *flags) +{ + ucp_ep_t *ep = sreq->send.ep; + ucp_dt_state_t *state = &sreq->send.state.dt; + size_t packet = 0; + int i; + ucp_lane_index_t lane; + ucs_status_t status; + + ucs_assert(UCP_DT_IS_CONTIG(sreq->send.datatype)); + + for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) { + lane = ucp_ep_get_rndv_get_lane(ep, i); + + status = ucp_request_send_buffer_reg(sreq, lane); + ucs_assert_always(status == UCS_OK); + + if (ucp_ep_rndv_md_flags(ep, i) & UCT_MD_FLAG_NEED_RKEY) { + UCS_PROFILE_CALL(uct_md_mkey_pack, ucp_ep_md(ep, lane), + state->dt.contig[i].memh, rkey_buf + packet); + packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size; + } + } + *flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY; + + ucs_assert_always(packet <= ucp_ep_config(ep)->am.max_bcopy); + + return packet; +} + +static void ucp_tag_rndv_unpack_rkeys(ucp_request_t *req, void *rkey_buf, uint16_t flags) +{ + ucp_ep_t *ep = req->send.ep; + size_t packet = 0; + ucp_lane_index_t rkeys; + ucp_lane_index_t lane; + int i; + + ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype)); + + if (flags & (UCP_EP_FLAG_TAG_OFFLOAD_ENABLED | UCP_RNDV_RTS_FLAG_PACKED_RKEY)) { + /* tag offload operates by single remote key only */ + rkeys = ucs_min(1, ucp_ep_rndv_num_lanes(ep)); + } else { + rkeys = ucp_ep_rndv_num_lanes(ep); + } + + for (i = 0; i < rkeys; i++) { + lane = ucp_ep_get_rndv_get_lane(ep, i); + if (ucp_ep_rndv_md_flags(ep, i) & UCT_MD_FLAG_NEED_RKEY) { + UCS_PROFILE_CALL(uct_rkey_unpack, rkey_buf + packet, + ucp_tag_rndv_rkey(req)); + packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size; + } + } +} + static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) { ucp_request_t *sreq = arg; /* the sender's request */ @@ -81,10 +146,9 @@ static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) rndv_rts_hdr->size = sreq->send.length; if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { rndv_rts_hdr->address = (uintptr_t) sreq->send.buffer; - if (ucp_ep_is_rndv_lane_present(ep)) { - packed_len += ucp_tag_rndv_pack_rkey(sreq, ucp_ep_get_rndv_get_lane(ep), - rndv_rts_hdr + 1, - &rndv_rts_hdr->flags); + if (ucp_ep_is_rndv_lane_present(ep, 0)) { + packed_len += ucp_tag_rndv_pack_rkeys(sreq, rndv_rts_hdr + 1, + &rndv_rts_hdr->flags); } } else if (UCP_DT_IS_GENERIC(sreq->send.datatype) || UCP_DT_IS_IOV(sreq->send.datatype)) { @@ -185,10 +249,6 @@ static void ucp_rndv_complete_rndv_get(ucp_request_t *rndv_req) UCS_PROFILE_REQUEST_EVENT(rreq, "complete_rndv_get", 0); // TODO ucp_request_complete_recv(rreq, UCS_OK); - if (rndv_req->send.rndv_get.rkey_bundle.rkey != UCT_INVALID_RKEY) { - uct_rkey_release(&rndv_req->send.rndv_get.rkey_bundle); - } - ucp_rndv_rma_request_send_buffer_dereg(rndv_req); ucp_rndv_send_ats(rndv_req, rndv_req->send.rndv_get.remote_request); @@ -236,6 +296,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), uct_pending_req_t *self) { ucp_request_t *rndv_req = ucs_container_of(self, ucp_request_t, send.uct); + ucp_ep_h ep = rndv_req->send.ep; ucs_status_t status; size_t offset, length, ucp_mtu, align; const size_t max_iovcnt = 1; @@ -243,13 +304,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), size_t iovcnt; ucp_rsc_index_t rsc_index; ucp_dt_state_t state; + ucp_lane_index_t lane; + uct_rkey_t rkey; - if (ucp_ep_is_stub(rndv_req->send.ep)) { + if (ucp_ep_is_stub(ep)) { return UCS_ERR_NO_RESOURCE; } - if (!(ucp_tag_rndv_is_get_op_possible(rndv_req->send.ep, rndv_req->send.lane, - rndv_req->send.rndv_get.rkey_bundle.rkey))) { + if (!(ucp_tag_rndv_is_get_op_possible(ep, rndv_req))) { /* can't perform get_zcopy - switch to AM rndv */ if (rndv_req->send.rndv_get.rkey_bundle.rkey != UCT_INVALID_RKEY) { uct_rkey_release(&rndv_req->send.rndv_get.rkey_bundle); @@ -266,10 +328,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), ucp_mtu = rndv_req->send.ep->worker->ifaces[rsc_index].attr.cap.get.align_mtu; ucs_trace_data("ep: %p try to progress get_zcopy for rndv get. rndv_req: %p. lane: %d", - rndv_req->send.ep, rndv_req, rndv_req->send.lane); + ep, rndv_req, rndv_req->send.lane); /* rndv_req is the internal request to perform the get operation */ - if (rndv_req->send.state.dt.dt.contig.memh == UCT_MEM_HANDLE_NULL) { + if (rndv_req->send.state.dt.dt.contig[0].memh == UCT_MEM_HANDLE_NULL) { /* TODO Not all UCTs need registration on the recv side */ UCS_PROFILE_REQUEST_EVENT(rndv_req->send.rndv_get.rreq, "rndv_recv_reg", 0); status = ucp_request_send_buffer_reg(rndv_req, rndv_req->send.lane); @@ -286,6 +348,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), ucp_ep_config(rndv_req->send.ep)->tag.rndv.max_get_zcopy); } + if (length == 0) { + return UCS_OK; + } + ucs_trace_data("offset %zu remainder %zu. read to %p len %zu", offset, (uintptr_t)rndv_req->send.buffer % align, (void*)rndv_req->send.buffer + offset, length); @@ -293,10 +359,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), state = rndv_req->send.state.dt; ucp_dt_iov_copy_uct(iov, &iovcnt, max_iovcnt, &state, rndv_req->send.buffer, ucp_dt_make_contig(1), length); - status = uct_ep_get_zcopy(rndv_req->send.ep->uct_eps[rndv_req->send.lane], + + lane = ucp_ep_get_rndv_get_lane(ep, 0); + rkey = rndv_req->send.rndv_get.rkey_bundle.rkey; + + status = uct_ep_get_zcopy(ep->uct_eps[lane], iov, iovcnt, rndv_req->send.rndv_get.remote_address + offset, - rndv_req->send.rndv_get.rkey_bundle.rkey, + rkey, &rndv_req->send.state.uct_comp); ucp_request_send_state_advance(rndv_req, &state, UCP_REQUEST_SEND_PROTO_RNDV_GET, @@ -305,8 +375,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), if (rndv_req->send.state.uct_comp.count == 0) { ucp_rndv_complete_rndv_get(rndv_req); } - return UCS_OK; + } else if (status == UCS_OK) { + /* in case if not all chunks are transmitted - return in_progress + * status */ + return UCS_INPROGRESS; } + return status; } @@ -338,10 +412,7 @@ static void ucp_rndv_handle_recv_contig(ucp_request_t *rndv_req, ucp_request_t * rndv_req->send.proto.remote_request = rndv_rts_hdr->sreq.reqptr; rndv_req->send.proto.rreq_ptr = (uintptr_t) rreq; } else { - if (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_PACKED_RKEY) { - UCS_PROFILE_CALL(uct_rkey_unpack, rndv_rts_hdr + 1, - &rndv_req->send.rndv_get.rkey_bundle); - } + ucp_tag_rndv_unpack_rkeys(rndv_req, rndv_rts_hdr + 1, rndv_rts_hdr->flags); /* rndv_req is the request that would perform the get operation */ rndv_req->send.uct.func = ucp_proto_progress_rndv_get_zcopy; rndv_req->send.buffer = rreq->recv.buffer; @@ -352,9 +423,9 @@ static void ucp_rndv_handle_recv_contig(ucp_request_t *rndv_req, ucp_request_t * if (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD) { rndv_req->send.lane = ucp_ep_get_tag_lane(rndv_req->send.ep); } else { - rndv_req->send.lane = ucp_ep_get_rndv_get_lane(rndv_req->send.ep); + rndv_req->send.lane = ucp_ep_get_rndv_get_lane(rndv_req->send.ep, 0); } - rndv_req->send.state.dt.dt.contig.memh = UCT_MEM_HANDLE_NULL; + rndv_req->send.state.dt.dt.contig[0].memh = UCT_MEM_HANDLE_NULL; rndv_req->send.rndv_get.remote_request = rndv_rts_hdr->sreq.reqptr; rndv_req->send.rndv_get.remote_address = rndv_rts_hdr->address; rndv_req->send.rndv_get.rreq = rreq; @@ -402,13 +473,13 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), /* if the receive side is not connected yet then the RTS was received on a stub ep */ if (ucp_ep_is_stub(ep)) { ucs_debug("received rts on a stub ep, ep=%p, rndv_lane=%d, " - "am_lane=%d", ep, ucp_ep_is_rndv_lane_present(ep) ? - ucp_ep_get_rndv_get_lane(ep): UCP_NULL_LANE, + "am_lane=%d", ep, ucp_ep_is_rndv_lane_present(ep, 0) ? + ucp_ep_get_rndv_get_lane(ep, 0): UCP_NULL_LANE, ucp_ep_get_am_lane(ep)); } if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) { - if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep) || + if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep, 0) || (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD))) { /* read the data from the sender with a get_zcopy operation on the * rndv lane */ @@ -626,14 +697,13 @@ static void ucp_rndv_prepare_zcopy_send_buffer(ucp_request_t *sreq, ucp_ep_h ep) if ((sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) && (ucp_ep_get_am_lane(ep) != ucp_ep_get_tag_lane(ep))) { ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep)); - sreq->send.state.dt.dt.contig.memh = UCT_MEM_HANDLE_NULL; - } else if ((ucp_ep_is_rndv_lane_present(ep)) && - (ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep))) { + } else if ((ucp_ep_is_rndv_lane_present(ep, 0)) && + (ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep, 0))) { /* dereg the original send request since we are going to send on the AM lane next */ ucp_rndv_rma_request_send_buffer_dereg(sreq); - sreq->send.state.dt.dt.contig.memh = UCT_MEM_HANDLE_NULL; } - if (sreq->send.state.dt.dt.contig.memh == UCT_MEM_HANDLE_NULL) { + + if (sreq->send.state.dt.dt.contig[0].memh == UCT_MEM_HANDLE_NULL) { /* register the send buffer for the zcopy operation */ status = ucp_request_send_buffer_reg(sreq, ucp_ep_get_am_lane(ep)); ucs_assert_always(status == UCS_OK); diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index e02f75b6c9c4..ab59985dd27a 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -970,8 +970,9 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, key->am_lane = lane; } if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RNDV) { - ucs_assert(key->rndv_lane == UCP_NULL_LANE); - key->rndv_lane = lane; + ucs_assert(key->rndv_lanes[0] == UCP_NULL_LANE); + key->rndv_lanes[0] = lane; + key->num_rndv_lanes = 1; } if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RMA) { key->rma_lanes[lane] = lane;