From 9de9fbc12a658b74f03a82e17ecb86e8317da037 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Tue, 7 Dec 2021 18:44:14 +0200 Subject: [PATCH] UCP/RNDV: Cancel RNDV operations after all UCT EPs are closed --- src/ucp/core/ucp_ep.c | 3 ++- src/ucp/core/ucp_request.c | 6 +++++- src/ucp/tag/rndv.c | 40 +++++++++++++++++++++++++++----------- src/ucp/tag/rndv.h | 3 +++ 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index c4f77e70024..3d9bceb0ab3 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -824,6 +824,8 @@ void ucp_ep_cleanup_lanes(ucp_ep_h ep) for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { ep->uct_eps[lane] = NULL; } + + ucp_ep_complete_rndv_reqs(ep); } static void ucp_worker_matchq_purge(ucp_tag_frag_match_t *matchq) @@ -935,7 +937,6 @@ void ucp_ep_disconnected(ucp_ep_h ep, ucs_status_t status, int force) ucp_stream_ep_cleanup(ep, status); ucp_am_ep_cleanup(ep); ucp_ep_cleanup_unexp(ep); - ucp_ep_complete_rndv_reqs(ep); ep->flags &= ~UCP_EP_FLAG_USED; diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index d061c541d5b..3a583bbdc79 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -490,7 +490,11 @@ void ucp_request_handle_send_error(ucp_request_t *req, ucs_status_t status) } } else { if (req->flags & UCP_REQUEST_FLAG_SEND_RNDV) { - ucp_rndv_complete_send(req, UCS_ERR_CANCELED, "rndv_flush"); + if (req->flags & UCP_REQUEST_FLAG_RNDV_RTS_SENT) { + ucp_rndv_req_add_to_cancelled_list(req, status); + } else { + ucp_rndv_complete_send(req, status, "rndv_flush"); + } } else { ucp_request_complete_send(req, status); } diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index da66bdde7cb..7f1afb3b3de 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -44,10 +44,13 @@ void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status, ucp_worker_h worker; khiter_t iter; + ucs_assertv(!(sreq->flags & UCP_REQUEST_FLAG_COMPLETED), "req %p", sreq); + ucp_request_send_generic_dt_finish(sreq); ucp_request_send_buffer_dereg(sreq); if (sreq->flags & UCP_REQUEST_FLAG_CANCELED) { ucs_list_del(&sreq->send.list); + sreq->flags &= ~UCP_REQUEST_FLAG_CANCELED; } /* remove from rndv sreqs hash */ @@ -202,6 +205,22 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) return sizeof(*rndv_rts_hdr) + packed_rkey_size; } +void ucp_rndv_req_add_to_cancelled_list(ucp_request_t *sreq, + ucs_status_t status) +{ + ucs_assertv(!(sreq->flags & UCP_REQUEST_FLAG_COMPLETED), "req %p", sreq); + + if (sreq->flags & UCP_REQUEST_FLAG_CANCELED) { + return; /* already cancelled */ + } + + sreq->status = status; + sreq->flags |= UCP_REQUEST_FLAG_CANCELED; + ucs_list_add_tail(&sreq->send.ep->worker->rndv_reqs_list, + &sreq->send.list); + ucs_trace_req("ep %p: %p was canceled", sreq->send.ep, sreq); +} + UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self), uct_pending_req_t *self) { @@ -244,7 +263,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self), { ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_h ep = sreq->send.ep; - ucp_memcpy_pack_context_t ctx;; + ucp_memcpy_pack_context_t ctx; + ucs_status_t status; ucp_rndv_rts_hdr_t rndv_rts_hdr; ssize_t packed_len; @@ -261,17 +281,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self), packed_len = uct_ep_am_bcopy(ep->uct_eps[sreq->send.lane], UCP_AM_ID_RNDV_RTS, ucp_memcpy_pack, &ctx, 0); - if (packed_len >= 0) { - sreq->flags |= UCP_REQUEST_FLAG_CANCELED; - ucs_list_add_tail(&ep->worker->rndv_reqs_list, &sreq->send.list); - return UCS_OK; - } else if (packed_len == UCS_ERR_NO_RESOURCE) { + if (packed_len == UCS_ERR_NO_RESOURCE) { return UCS_ERR_NO_RESOURCE; - } else { - ucp_rndv_complete_send(sreq, (ucs_status_t)packed_len, - "progress_rndv_cancel"); - return UCS_OK; } + + status = (packed_len >= 0) ? UCS_ERR_CANCELED : (ucs_status_t)packed_len; + ucp_rndv_req_add_to_cancelled_list(sreq, status); + + return UCS_OK; } static size_t ucp_tag_rndv_rtr_pack(void *dest, void *arg) @@ -421,7 +438,8 @@ void ucp_ep_complete_rndv_reqs(ucp_ep_h ep) ucs_list_for_each_safe(sreq, tmp, &worker->rndv_reqs_list, send.list) { if (sreq->send.ep == ep) { - ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED, + ucs_assert(UCS_STATUS_IS_ERR(sreq->status)); + ucp_rndv_complete_send(sreq, sreq->status, "ep_closed_rndv_cancel"); } } diff --git a/src/ucp/tag/rndv.h b/src/ucp/tag/rndv.h index bd5b53279f3..374e512737b 100644 --- a/src/ucp/tag/rndv.h +++ b/src/ucp/tag/rndv.h @@ -65,6 +65,9 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, size_t ucp_tag_rndv_rts_pack(void *dest, void *arg); +void ucp_rndv_req_add_to_cancelled_list(ucp_request_t *sreq, + ucs_status_t status); + ucs_status_t ucp_tag_rndv_reg_send_buffer(ucp_request_t *sreq); void ucp_ep_complete_rndv_reqs(ucp_ep_h ep);