Skip to content

Commit

Permalink
UCP/RNDV: Cancel RNDV operations after all UCT EPs are closed
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Dec 7, 2021
1 parent c7b048c commit 2f82417
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ void ucp_ep_cleanup_lanes(ucp_ep_h ep)
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
ep->uct_eps[lane] = NULL;
}

ucp_ep_complete_rndv_reqs(ep);
}

static void ucp_worker_matchq_purge(ucp_tag_frag_match_t *matchq)
Expand Down Expand Up @@ -935,7 +937,6 @@ void ucp_ep_disconnected(ucp_ep_h ep, ucs_status_t status, int force)
ucp_stream_ep_cleanup(ep, status);
ucp_am_ep_cleanup(ep);
ucp_ep_cleanup_unexp(ep);
ucp_ep_complete_rndv_reqs(ep);

ep->flags &= ~UCP_EP_FLAG_USED;

Expand Down
30 changes: 19 additions & 11 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg)
return sizeof(*rndv_rts_hdr) + packed_rkey_size;
}

static void ucp_rndv_req_cancel(ucp_request_t *sreq, ucs_status_t status)
{
sreq->status = status;
sreq->flags |= UCP_REQUEST_FLAG_CANCELED;
ucs_list_add_tail(&sreq->send.ep->worker->rndv_reqs_list,
&sreq->send.list);
ucs_trace_req("ep %p: %p was canceled", sreq->send.ep, sreq);
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self),
uct_pending_req_t *self)
{
Expand Down Expand Up @@ -234,7 +243,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self),
return UCS_ERR_NO_RESOURCE;
} else {
ucs_assert(UCS_STATUS_IS_ERR(status));
ucp_rndv_complete_send(sreq, status, "rts_cancel");
ucp_rndv_req_cancel(sreq, status);
return UCS_OK;
}
}
Expand All @@ -244,7 +253,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self),
{
ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_h ep = sreq->send.ep;
ucp_memcpy_pack_context_t ctx;;
ucp_memcpy_pack_context_t ctx;
ucs_status_t status;
ucp_rndv_rts_hdr_t rndv_rts_hdr;
ssize_t packed_len;

Expand All @@ -261,15 +271,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self),

packed_len = uct_ep_am_bcopy(ep->uct_eps[sreq->send.lane],
UCP_AM_ID_RNDV_RTS, ucp_memcpy_pack, &ctx, 0);
if (packed_len >= 0) {
sreq->flags |= UCP_REQUEST_FLAG_CANCELED;
ucs_list_add_tail(&ep->worker->rndv_reqs_list, &sreq->send.list);
return UCS_OK;
} else if (packed_len == UCS_ERR_NO_RESOURCE) {
if (packed_len == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
} else {
ucp_rndv_complete_send(sreq, (ucs_status_t)packed_len,
"progress_rndv_cancel");
status = (packed_len >= 0) ? UCS_ERR_CANCELED :
(ucs_status_t)packed_len;
ucp_rndv_req_cancel(sreq, status);
return UCS_OK;
}
}
Expand Down Expand Up @@ -404,7 +411,7 @@ void ucp_tag_rndv_cancel(ucp_request_t *sreq)
{
if (!(sreq->send.ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED)) {
if (sreq->flags & UCP_REQUEST_FLAG_RNDV_RTS_SENT) {
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED, "rndv_cancel");
ucp_rndv_req_cancel(sreq, UCS_ERR_CANCELED);
}
} else {
sreq->send.uct.func = ucp_proto_progress_rndv_cancel;
Expand All @@ -421,7 +428,8 @@ void ucp_ep_complete_rndv_reqs(ucp_ep_h ep)

ucs_list_for_each_safe(sreq, tmp, &worker->rndv_reqs_list, send.list) {
if (sreq->send.ep == ep) {
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED,
ucs_assert(UCS_STATUS_IS_ERR(sreq->status));
ucp_rndv_complete_send(sreq, sreq->status,
"ep_closed_rndv_cancel");
}
}
Expand Down

0 comments on commit 2f82417

Please sign in to comment.