Skip to content

Commit

Permalink
UCP: tm offload eager fixes p1
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-mikheev committed May 25, 2017
1 parent a44896d commit b4ac8be
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 57 deletions.
9 changes: 5 additions & 4 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ typedef uint16_t ucp_ep_cfg_index_t;
* Endpoint flags
*/
enum {
UCP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(0), /* All local endpoints are connected */
UCP_EP_FLAG_REMOTE_CONNECTED = UCS_BIT(1), /* All remote endpoints are connected */
UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(2), /* Connection request was sent */
UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(3), /* Debug: Connection reply was sent */
UCP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(0), /* All local endpoints are connected */
UCP_EP_FLAG_REMOTE_CONNECTED = UCS_BIT(1), /* All remote endpoints are connected */
UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(2), /* Connection request was sent */
UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(3), /* Debug: Connection reply was sent */
UCP_EP_FLAG_TAG_OFFLOAD_ENABLED = UCS_BIT(4) /* Endpoint uses tl offload for tag matching */
};


Expand Down
24 changes: 13 additions & 11 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ void ucp_iov_buffer_memh_dereg(uct_md_h uct_md, uct_mem_h *memh,
}
}

ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
void *buffer, size_t length,
ucp_datatype_t datatype, ucp_dt_state_t *state)
UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
(context, rsc_index, buffer, length, datatype, state),
ucp_context_t *context, ucp_rsc_index_t rsc_index, void *buffer,
size_t length, ucp_datatype_t datatype, ucp_dt_state_t *state)
{
ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index;
uct_md_h uct_md = context->tl_mds[mdi].md;
Expand Down Expand Up @@ -224,8 +225,10 @@ ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_
return status;
}

void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state)
UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg,
(context, rsc_index, datatype, state),
ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state)
{
ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index;
uct_md_h uct_md = context->tl_mds[mdi].md;
Expand All @@ -252,8 +255,8 @@ void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
}
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane),
ucp_request_t *req, ucp_lane_index_t lane)
ucs_status_t ucp_request_send_buffer_reg(ucp_request_t *req,
ucp_lane_index_t lane)
{
ucp_context_t *context = req->send.ep->worker->context;
ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(req->send.ep, lane);
Expand All @@ -263,13 +266,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane),
&req->send.state);
}

UCS_PROFILE_FUNC_VOID(ucp_request_send_buffer_dereg, (req, lane),
ucp_request_t *req, ucp_lane_index_t lane)
void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane)
{
ucp_context_t *context = req->send.ep->worker->context;
ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(req->send.ep, lane);

return ucp_request_memory_dereg(context, rsc_index, req->send.datatype,
&req->send.state);
ucp_request_memory_dereg(context, rsc_index, req->send.datatype,
&req->send.state);
}

5 changes: 2 additions & 3 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ enum {
UCP_REQUEST_FLAG_RECV = UCS_BIT(7),
UCP_REQUEST_FLAG_SYNC = UCS_BIT(8),
UCP_REQUEST_FLAG_RNDV = UCS_BIT(9),
UCP_REQUEST_FLAG_MATCHED = UCS_BIT(10),
UCP_REQUEST_FLAG_OFFLOADED = UCS_BIT(11),
UCP_REQUEST_FLAG_BLOCK_OFFLOAD = UCS_BIT(12),
UCP_REQUEST_FLAG_OFFLOADED = UCS_BIT(10),
UCP_REQUEST_FLAG_BLOCK_OFFLOAD = UCS_BIT(11),

#if ENABLE_ASSERT
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(15)
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/eager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void ucp_tag_eager_zcopy_req_complete(ucp_request_t *req);
static inline ucs_status_t ucp_tag_send_eager_short(ucp_ep_t *ep, ucp_tag_t tag,
const void *buffer, size_t length)
{
if (ucp_ep_is_tag_offload_enabled(ucp_ep_config(ep))) {
if (ep->flags & UCP_EP_FLAG_TAG_OFFLOAD_ENABLED) {
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uct_tag_t));
return uct_ep_tag_eager_short(ucp_ep_get_tag_uct_ep(ep), tag, buffer, length);
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,

/* Cancel req in transport if it was offloaded,
* because it arrived as unexpected */
ucp_tag_offload_cancel(context, req, 1);
if (flags & UCP_RECV_DESC_FLAG_OFFLOAD) {
ucp_tag_offload_cancel(context, req, 1);
}

if (flags & UCP_RECV_DESC_FLAG_LAST) {
req->recv.info.length = recv_len;
Expand Down
43 changes: 17 additions & 26 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,48 +49,43 @@ void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force)
ucp_worker_iface_t *ucp_iface;
ucs_status_t status;

if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces,
ucp_worker_iface_t, queue);
ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype,
&req->recv.state);
status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, force);
if (status != UCS_OK) {
ucs_error("Failed to cancel recv in the transport: %s",
ucs_status_string(status));
}
ucs_assert(req->flags & UCP_REQUEST_FLAG_OFFLOADED);

ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces,
ucp_worker_iface_t, queue);
ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype,
&req->recv.state);
status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, force);
if (status != UCS_OK) {
ucs_error("Failed to cancel recv in the transport: %s",
ucs_status_string(status));
}
}

void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req)
int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req)
{
size_t length = req->recv.length;
ucp_worker_iface_t *ucp_iface;
ucs_status_t status;
uct_iov_t iov;

if (length < ctx->tm.post_thresh) {
/* Message is smaller than post threshold or offloading is disabled. */
goto skip;
}

if (!UCP_DT_IS_CONTIG(req->recv.datatype)) {
/* Non-contig buffers not supported yet. */
goto skip;
return 0;
}

if ((ctx->config.tag_sender_mask & req->recv.tag_mask) !=
ctx->config.tag_sender_mask) {
/* Wildcard.
* TODO add check that only offload capable iface present. In
* this case can post tag as well. */
goto skip;
return 0;
}

if (ctx->tm.sw_req_count) {
/* There are some requests which must be completed in SW. Do not post
* tags to HW until they are completed. */
goto skip;
return 0;
}

ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces,
Expand All @@ -99,7 +94,7 @@ void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req)
req->recv.length, req->recv.datatype,
&req->recv.state);
if (status != UCS_OK) {
goto skip;
return 0;
}

req->recv.uct_ctx.tag_consumed_cb = ucp_tag_offload_tag_consumed;
Expand All @@ -115,16 +110,12 @@ void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req)
&req->recv.uct_ctx);
if (status != UCS_OK) {
/* No more matching entries in the transport. */
goto skip;
return 0;
}
req->flags |= UCP_REQUEST_FLAG_OFFLOADED;
ucs_trace_req("recv request %p (%p) was posted to transport (rsc %d)",
req, req + 1, ucp_iface->rsc_index);
return;

skip:
req->flags |= UCP_REQUEST_FLAG_BLOCK_OFFLOAD;
++ctx->tm.sw_req_count;
return 1;
}

static size_t ucp_tag_offload_pack_eager(void *dest, void *arg)
Expand Down
14 changes: 13 additions & 1 deletion src/ucp/tag/offload.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ ucs_status_t ucp_tag_offload_unexp_eager(void *arg, void *data, size_t length,

void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force);

void ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req);
int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req);

static UCS_F_ALWAYS_INLINE void
ucp_tag_offload_try_post(ucp_context_t *ctx, ucp_request_t *req)
{
if (ucs_unlikely(req->recv.length >= ctx->tm.post_thresh)) {
if (ucp_tag_offload_post(ctx, req)) {
return;
}
}
req->flags |= UCP_REQUEST_FLAG_BLOCK_OFFLOAD;
++ctx->tm.sw_req_count;
}

#endif
4 changes: 3 additions & 1 deletion src/ucp/tag/tag_match.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ void ucp_tag_exp_remove(ucp_tag_match_t *tm, ucp_request_t *req)

ucs_queue_for_each_safe(qreq, iter, queue, recv.queue) {
if (qreq == req) {
ucp_tag_offload_cancel(ctx, req, 0);
if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_tag_offload_cancel(ctx, req, 0);
}
ucs_queue_del_iter(queue, iter);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/tag_match.inl
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ ucp_tag_unexp_desc_release(ucp_recv_desc_t *rdesc)
ucs_trace_req("release receive descriptor %p", rdesc);
if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_UCT_DESC)) {
/* uct desc is slowpath */
if (rdesc->flags & UCP_RECV_DESC_FLAG_OFFLOAD) {
if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_OFFLOAD)) {
uct_iface_release_desc(rdesc);
} else {
uct_iface_release_desc((char*)rdesc - sizeof(ucp_eager_hdr_t));
Expand Down
11 changes: 3 additions & 8 deletions src/ucp/tag/tag_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size,
ucp_tag_log_match(recv_tag, rdesc->length - rdesc->hdr_len, req, tag,
tag_mask, req->recv.state.offset, "unexpected");
ucp_tag_unexp_remove(rdesc);
req->flags |= UCP_REQUEST_FLAG_MATCHED;

if (rdesc->flags & UCP_RECV_DESC_FLAG_EAGER) {
UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0);
Expand Down Expand Up @@ -210,13 +209,9 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count,
req->recv.cb = cb;
ucp_tag_exp_push(&context->tm, queue, req);

if (!(req->flags & UCP_REQUEST_FLAG_MATCHED)) {
/* If offload supported, post this tag to transport as well.
* Check for match flag, because it may be a part of already
* matched message in AM protocol (thus we do not need to post
* tag to transport, because it is already being handled by AM). */
ucp_tag_offload_post(worker->context, req);
}
/* If offload supported, post this tag to transport as well.
* TODO: need to distinguish the cases when posting is not needed. */
ucp_tag_offload_try_post(worker->context, req);
ucs_trace_req("%s returning expected request %p (%p)", debug_name, req,
req + 1);
}
Expand Down
5 changes: 5 additions & 0 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned address_count,
ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;
}

/* Cache tag offload state in the flags for fast-path */
if (ucp_ep_is_tag_offload_enabled(ucp_ep_config(ep))) {
ep->flags |= UCP_EP_FLAG_TAG_OFFLOAD_ENABLED;
}

return UCS_OK;

err:
Expand Down

0 comments on commit b4ac8be

Please sign in to comment.