Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/CORE: Complete outstanding RNDV reqs after all UCT lanes are destroyed #227

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ void ucp_ep_cleanup_lanes(ucp_ep_h ep)
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
ep->uct_eps[lane] = NULL;
}

ucp_ep_complete_rndv_reqs(ep);
}

static void ucp_worker_matchq_purge(ucp_tag_frag_match_t *matchq)
Expand Down Expand Up @@ -935,7 +937,6 @@ void ucp_ep_disconnected(ucp_ep_h ep, ucs_status_t status, int force)
ucp_stream_ep_cleanup(ep, status);
ucp_am_ep_cleanup(ep);
ucp_ep_cleanup_unexp(ep);
ucp_ep_complete_rndv_reqs(ep);

ep->flags &= ~UCP_EP_FLAG_USED;

Expand Down
6 changes: 5 additions & 1 deletion src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,11 @@ void ucp_request_handle_send_error(ucp_request_t *req, ucs_status_t status)
}
} else {
if (req->flags & UCP_REQUEST_FLAG_SEND_RNDV) {
ucp_rndv_complete_send(req, UCS_ERR_CANCELED, "rndv_flush");
if (req->flags & UCP_REQUEST_FLAG_RNDV_RTS_SENT) {
ucp_rndv_req_add_to_cancelled_list(req, status);
} else {
ucp_rndv_complete_send(req, status, "rndv_flush");
}
} else {
ucp_request_complete_send(req, status);
}
Expand Down
40 changes: 29 additions & 11 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status,
ucp_worker_h worker;
khiter_t iter;

ucs_assertv(!(sreq->flags & UCP_REQUEST_FLAG_COMPLETED), "req %p", sreq);

ucp_request_send_generic_dt_finish(sreq);
ucp_request_send_buffer_dereg(sreq);
if (sreq->flags & UCP_REQUEST_FLAG_CANCELED) {
ucs_list_del(&sreq->send.list);
sreq->flags &= ~UCP_REQUEST_FLAG_CANCELED;
}

/* remove from rndv sreqs hash */
Expand Down Expand Up @@ -202,6 +205,22 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg)
return sizeof(*rndv_rts_hdr) + packed_rkey_size;
}

void ucp_rndv_req_add_to_cancelled_list(ucp_request_t *sreq,
ucs_status_t status)
{
ucs_assertv(!(sreq->flags & UCP_REQUEST_FLAG_COMPLETED), "req %p", sreq);

if (sreq->flags & UCP_REQUEST_FLAG_CANCELED) {
return; /* already cancelled */
}

sreq->status = status;
sreq->flags |= UCP_REQUEST_FLAG_CANCELED;
ucs_list_add_tail(&sreq->send.ep->worker->rndv_reqs_list,
&sreq->send.list);
ucs_trace_req("ep %p: %p was canceled", sreq->send.ep, sreq);
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self),
uct_pending_req_t *self)
{
Expand Down Expand Up @@ -244,7 +263,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self),
{
ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_h ep = sreq->send.ep;
ucp_memcpy_pack_context_t ctx;;
ucp_memcpy_pack_context_t ctx;
ucs_status_t status;
ucp_rndv_rts_hdr_t rndv_rts_hdr;
ssize_t packed_len;

Expand All @@ -261,17 +281,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self),

packed_len = uct_ep_am_bcopy(ep->uct_eps[sreq->send.lane],
UCP_AM_ID_RNDV_RTS, ucp_memcpy_pack, &ctx, 0);
if (packed_len >= 0) {
sreq->flags |= UCP_REQUEST_FLAG_CANCELED;
ucs_list_add_tail(&ep->worker->rndv_reqs_list, &sreq->send.list);
return UCS_OK;
} else if (packed_len == UCS_ERR_NO_RESOURCE) {
if (packed_len == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
} else {
ucp_rndv_complete_send(sreq, (ucs_status_t)packed_len,
"progress_rndv_cancel");
return UCS_OK;
}

status = (packed_len >= 0) ? UCS_ERR_CANCELED : (ucs_status_t)packed_len;
ucp_rndv_req_add_to_cancelled_list(sreq, status);

return UCS_OK;
}

static size_t ucp_tag_rndv_rtr_pack(void *dest, void *arg)
Expand Down Expand Up @@ -421,7 +438,8 @@ void ucp_ep_complete_rndv_reqs(ucp_ep_h ep)

ucs_list_for_each_safe(sreq, tmp, &worker->rndv_reqs_list, send.list) {
if (sreq->send.ep == ep) {
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED,
ucs_assert(UCS_STATUS_IS_ERR(sreq->status));
ucp_rndv_complete_send(sreq, sreq->status,
"ep_closed_rndv_cancel");
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/tag/rndv.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,

size_t ucp_tag_rndv_rts_pack(void *dest, void *arg);

void ucp_rndv_req_add_to_cancelled_list(ucp_request_t *sreq,
ucs_status_t status);

ucs_status_t ucp_tag_rndv_reg_send_buffer(ucp_request_t *sreq);

void ucp_ep_complete_rndv_reqs(ucp_ep_h ep);
Expand Down