diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 787c6f0da38..f6366d63298 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -25,10 +25,11 @@ typedef uint16_t ucp_ep_cfg_index_t; * Endpoint flags */ enum { - UCP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(0), /* All local endpoints are connected */ - UCP_EP_FLAG_REMOTE_CONNECTED = UCS_BIT(1), /* All remote endpoints are connected */ - UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(2), /* Connection request was sent */ - UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(3), /* Debug: Connection reply was sent */ + UCP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(0), /* All local endpoints are connected */ + UCP_EP_FLAG_REMOTE_CONNECTED = UCS_BIT(1), /* All remote endpoints are connected */ + UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(2), /* Connection request was sent */ + UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(3), /* Debug: Connection reply was sent */ + UCP_EP_FLAG_TAG_OFFLOAD_ENABLED = UCS_BIT(4) /* Endpoint uses tl offload for tag matching */ }; diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index a719cb126ee..a7f51d96dcf 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -168,9 +168,10 @@ void ucp_iov_buffer_memh_dereg(uct_md_h uct_md, uct_mem_h *memh, } } -ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index, - void *buffer, size_t length, - ucp_datatype_t datatype, ucp_dt_state_t *state) +UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg, + (context, rsc_index, buffer, length, datatype, state), + ucp_context_t *context, ucp_rsc_index_t rsc_index, void *buffer, + size_t length, ucp_datatype_t datatype, ucp_dt_state_t *state) { ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index; uct_md_h uct_md = context->tl_mds[mdi].md; @@ -224,8 +225,10 @@ ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_ return status; } -void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index, - ucp_datatype_t datatype, ucp_dt_state_t *state) +UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg, + (context, rsc_index, datatype, state), + ucp_context_t *context, ucp_rsc_index_t rsc_index, + ucp_datatype_t datatype, ucp_dt_state_t *state) { ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index; uct_md_h uct_md = context->tl_mds[mdi].md; @@ -252,8 +255,8 @@ void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index, } } -UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane), - ucp_request_t *req, ucp_lane_index_t lane) +ucs_status_t ucp_request_send_buffer_reg(ucp_request_t *req, + ucp_lane_index_t lane) { ucp_context_t *context = req->send.ep->worker->context; ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(req->send.ep, lane); @@ -263,13 +266,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane), &req->send.state); } -UCS_PROFILE_FUNC_VOID(ucp_request_send_buffer_dereg, (req, lane), - ucp_request_t *req, ucp_lane_index_t lane) +void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane) { ucp_context_t *context = req->send.ep->worker->context; ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(req->send.ep, lane); - return ucp_request_memory_dereg(context, rsc_index, req->send.datatype, - &req->send.state); + ucp_request_memory_dereg(context, rsc_index, req->send.datatype, + &req->send.state); } diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index bc3d2053a8b..695e5c04acc 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -33,9 +33,8 @@ enum { UCP_REQUEST_FLAG_RECV = UCS_BIT(7), UCP_REQUEST_FLAG_SYNC = UCS_BIT(8), UCP_REQUEST_FLAG_RNDV = UCS_BIT(9), - UCP_REQUEST_FLAG_MATCHED = UCS_BIT(10), - UCP_REQUEST_FLAG_OFFLOADED = UCS_BIT(11), - UCP_REQUEST_FLAG_BLOCK_OFFLOAD = UCS_BIT(12), + UCP_REQUEST_FLAG_OFFLOADED = UCS_BIT(10), + UCP_REQUEST_FLAG_BLOCK_OFFLOAD = UCS_BIT(11), #if ENABLE_ASSERT UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(15) diff --git a/src/ucp/tag/eager.h b/src/ucp/tag/eager.h index 08024a64dca..f1d81e488de 100644 --- a/src/ucp/tag/eager.h +++ b/src/ucp/tag/eager.h @@ -68,7 +68,7 @@ void ucp_tag_eager_zcopy_req_complete(ucp_request_t *req); static inline ucs_status_t ucp_tag_send_eager_short(ucp_ep_t *ep, ucp_tag_t tag, const void *buffer, size_t length) { - if (ucp_ep_is_tag_offload_enabled(ucp_ep_config(ep))) { + if (ep->flags & UCP_EP_FLAG_TAG_OFFLOAD_ENABLED) { UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uct_tag_t)); return uct_ep_tag_eager_short(ucp_ep_get_tag_uct_ep(ep), tag, buffer, length); } else { diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index 1594241f3f1..a9f4b3ab1a7 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -73,7 +73,9 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, /* Cancel req in transport if it was offloaded, * because it arrived as unexpected */ - ucp_tag_offload_cancel(context, req, 1); + if (flags & UCP_RECV_DESC_FLAG_OFFLOAD) { + ucp_tag_offload_cancel(context, req, 1); + } if (flags & UCP_RECV_DESC_FLAG_LAST) { req->recv.info.length = recv_len; diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 063b979573e..22eb37797ec 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -49,34 +49,29 @@ void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) ucp_worker_iface_t *ucp_iface; ucs_status_t status; - if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) { - ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, - ucp_worker_iface_t, queue); - ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype, - &req->recv.state); - status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, force); - if (status != UCS_OK) { - ucs_error("Failed to cancel recv in the transport: %s", - ucs_status_string(status)); - } + ucs_assert(req->flags & UCP_REQUEST_FLAG_OFFLOADED); + + ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, + ucp_worker_iface_t, queue); + ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype, + &req->recv.state); + status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, force); + if (status != UCS_OK) { + ucs_error("Failed to cancel recv in the transport: %s", + ucs_status_string(status)); } } -void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) +int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) { size_t length = req->recv.length; ucp_worker_iface_t *ucp_iface; ucs_status_t status; uct_iov_t iov; - if (length < ctx->tm.post_thresh) { - /* Message is smaller than post threshold or offloading is disabled. */ - goto skip; - } - if (!UCP_DT_IS_CONTIG(req->recv.datatype)) { /* Non-contig buffers not supported yet. */ - goto skip; + return 0; } if ((ctx->config.tag_sender_mask & req->recv.tag_mask) != @@ -84,13 +79,13 @@ void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) /* Wildcard. * TODO add check that only offload capable iface present. In * this case can post tag as well. */ - goto skip; + return 0; } if (ctx->tm.sw_req_count) { /* There are some requests which must be completed in SW. Do not post * tags to HW until they are completed. */ - goto skip; + return 0; } ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, @@ -99,7 +94,7 @@ void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) req->recv.length, req->recv.datatype, &req->recv.state); if (status != UCS_OK) { - goto skip; + return 0; } req->recv.uct_ctx.tag_consumed_cb = ucp_tag_offload_tag_consumed; @@ -115,16 +110,12 @@ void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) &req->recv.uct_ctx); if (status != UCS_OK) { /* No more matching entries in the transport. */ - goto skip; + return 0; } req->flags |= UCP_REQUEST_FLAG_OFFLOADED; ucs_trace_req("recv request %p (%p) was posted to transport (rsc %d)", req, req + 1, ucp_iface->rsc_index); - return; - -skip: - req->flags |= UCP_REQUEST_FLAG_BLOCK_OFFLOAD; - ++ctx->tm.sw_req_count; + return 1; } static size_t ucp_tag_offload_pack_eager(void *dest, void *arg) diff --git a/src/ucp/tag/offload.h b/src/ucp/tag/offload.h index caad9d75126..2805890677b 100644 --- a/src/ucp/tag/offload.h +++ b/src/ucp/tag/offload.h @@ -21,6 +21,18 @@ ucs_status_t ucp_tag_offload_unexp_eager(void *arg, void *data, size_t length, void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force); -void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req); +int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req); + +static UCS_F_ALWAYS_INLINE void +ucp_tag_offload_try_post(ucp_context_t *ctx, ucp_request_t *req) +{ + if (ucs_unlikely(req->recv.length >= ctx->tm.post_thresh)) { + if (ucp_tag_offload_post(ctx, req)) { + return; + } + } + req->flags |= UCP_REQUEST_FLAG_BLOCK_OFFLOAD; + ++ctx->tm.sw_req_count; +} #endif diff --git a/src/ucp/tag/tag_match.c b/src/ucp/tag/tag_match.c index 8ca84f0165c..2fc8af95f09 100644 --- a/src/ucp/tag/tag_match.c +++ b/src/ucp/tag/tag_match.c @@ -61,7 +61,9 @@ void ucp_tag_exp_remove(ucp_tag_match_t *tm, ucp_request_t *req) ucs_queue_for_each_safe(qreq, iter, queue, recv.queue) { if (qreq == req) { - ucp_tag_offload_cancel(ctx, req, 0); + if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) { + ucp_tag_offload_cancel(ctx, req, 0); + } ucs_queue_del_iter(queue, iter); return; } diff --git a/src/ucp/tag/tag_match.inl b/src/ucp/tag/tag_match.inl index acc5e8681ed..e370910d102 100644 --- a/src/ucp/tag/tag_match.inl +++ b/src/ucp/tag/tag_match.inl @@ -198,7 +198,7 @@ ucp_tag_unexp_desc_release(ucp_recv_desc_t *rdesc) ucs_trace_req("release receive descriptor %p", rdesc); if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_UCT_DESC)) { /* uct desc is slowpath */ - if (rdesc->flags & UCP_RECV_DESC_FLAG_OFFLOAD) { + if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_OFFLOAD)) { uct_iface_release_desc(rdesc); } else { uct_iface_release_desc((char*)rdesc - sizeof(ucp_eager_hdr_t)); diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index bb0a4a01d1a..e4d4d5702c3 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -79,7 +79,6 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_tag_log_match(recv_tag, rdesc->length - rdesc->hdr_len, req, tag, tag_mask, req->recv.state.offset, "unexpected"); ucp_tag_unexp_remove(rdesc); - req->flags |= UCP_REQUEST_FLAG_MATCHED; if (rdesc->flags & UCP_RECV_DESC_FLAG_EAGER) { UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0); @@ -210,13 +209,9 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, req->recv.cb = cb; ucp_tag_exp_push(&context->tm, queue, req); - if (!(req->flags & UCP_REQUEST_FLAG_MATCHED)) { - /* If offload supported, post this tag to transport as well. - * Check for match flag, because it may be a part of already - * matched message in AM protocol (thus we do not need to post - * tag to transport, because it is already being handled by AM). */ - ucp_tag_offload_post(worker->context, req); - } + /* If offload supported, post this tag to transport as well. + * TODO: need to distinguish the cases when posting is not needed. */ + ucp_tag_offload_try_post(worker->context, req); ucs_trace_req("%s returning expected request %p (%p)", debug_name, req, req + 1); } diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index a2153d78876..cd7fffc6cc3 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -496,6 +496,11 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned address_count, ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED; } + /* Cache tag offload state in the flags for fast-path */ + if (ucp_ep_is_tag_offload_enabled(ucp_ep_config(ep))) { + ep->flags |= UCP_EP_FLAG_TAG_OFFLOAD_ENABLED; + } + return UCS_OK; err: