Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/EP: Cleanup proto reqs on failure #5821

Merged
merged 1 commit into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
memset(&ucp_ep_ext_gen(ep)->ep_match, 0,
sizeof(ucp_ep_ext_gen(ep)->ep_match));

ucs_hlist_head_init(&ucp_ep_ext_gen(ep)->proto_reqs);
ucp_stream_ep_init(ep);
ucp_am_ep_init(ep);

Expand Down Expand Up @@ -605,6 +606,8 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker,
goto err_cleanup_lanes;
}

ucp_ep_flush_state_reset(ep);

*ep_p = ep;
return UCS_OK;

Expand Down Expand Up @@ -963,6 +966,7 @@ void ucp_ep_disconnected(ucp_ep_h ep, int force)

ucp_stream_ep_cleanup(ep);
ucp_am_ep_cleanup(ep);
ucp_ep_reqs_purge(ep, UCS_ERR_CANCELED);

ucp_ep_update_flags(ep, 0, UCP_EP_FLAG_USED);

Expand Down Expand Up @@ -2667,3 +2671,72 @@ void ucp_ep_do_keepalive(ucp_ep_h ep, ucp_lane_map_t *lane_map)
*lane_map &= ~UCS_BIT(lane);
}
}

static void ucp_ep_req_purge(ucp_ep_h ucp_ep, ucp_request_t *req,
ucs_status_t status, int recursive)
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
{
ucp_trace_req(req, "purged with status %s (%d) on ep %p",
ucs_status_string(status), status, ucp_ep);

if (req->id != UCP_REQUEST_ID_INVALID) {
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
ucp_request_id_release(req);
}
Comment on lines +2681 to +2683
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible some of the protocols cleanup flow would try to release the request it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, Yossi
currently, we don't have such protocols, but they could be implemented in the future
fixed this comment by moving checking/releasing of request ID at the end of the function, but need to implement some hack (save and remove RELEASE flag before calling request complete procedure)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "extract" the id from the request to a local variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then protocols which expect that request ID is valid will fail ucp_request_id_release()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decided to not do it for now


if (req->flags & (UCP_REQUEST_FLAG_SEND_AM | UCP_REQUEST_FLAG_SEND_TAG)) {
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
ucs_assert(req->super_req == NULL);
ucs_assert(req->send.ep == ucp_ep);
ucp_request_complete_and_dereg_send(req, status);
} else if (req->flags & UCP_REQUEST_FLAG_RECV_AM) {
ucs_assert(req->super_req == NULL);
ucs_assert(recursive); /* Mustn't be directly contained in an EP list
* of tracking requests */
ucp_request_complete_am_recv(req, status);
} else if (req->flags & UCP_REQUEST_FLAG_RECV_TAG) {
ucs_assert(req->super_req == NULL);
ucs_assert(recursive); /* Mustn't be directly contained in an EP list
* of tracking requests */
ucp_request_complete_tag_recv(req, status);
} else if (req->flags & UCP_REQUEST_FLAG_RNDV_FRAG) {
ucs_assert(req->super_req != NULL);
ucs_assert(req->send.ep == ucp_ep);
ucs_assert(recursive); /* Mustn't be directly contained in an EP list
* of tracking requests */

/* It means that purging started from a request responsible for sending
* RTR, so a request is responsible for copying data from staging buffer
* and it uses a receive part of a request */
req->super_req->recv.remaining -= req->recv.length;
if (req->super_req->recv.remaining == 0) {
ucp_ep_req_purge(ucp_ep, req->super_req, status, 1);
}
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved

ucp_request_put(req);
} else {
ucs_assert(req->super_req != NULL);
ucs_assert(req->send.ep == ucp_ep);

ucp_ep_req_purge(ucp_ep, req->super_req, status, 1);
ucp_request_put(req);
}
}

void ucp_ep_reqs_purge(ucp_ep_h ucp_ep, ucs_status_t status)
{
ucs_hlist_head_t *proto_reqs = &ucp_ep_ext_gen(ucp_ep)->proto_reqs;
ucp_request_t *req;

while (!ucs_hlist_is_empty(proto_reqs)) {
req = ucs_hlist_head_elem(proto_reqs, ucp_request_t, send.list);
ucp_ep_req_purge(ucp_ep, req, status, 0);
}

if (/* Flush state is already valid (i.e. EP doesn't exist on matching
* context) and not invalidated yet, also remote EP ID is already set */
!(ucp_ep->flags &
(UCP_EP_FLAG_ON_MATCH_CTX | UCP_EP_FLAG_CLOSE_REQ_VALID))) {
ucs_hlist_for_each_extract(req, &ucp_ep_flush_state(ucp_ep)->reqs,
send.list) {
ucp_ep_flush_request_ff(req, status);
}
}
}
11 changes: 11 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ typedef struct {
ucp_ep_flush_state_t flush_state; /* Remote completion status */
};
ucp_ep_ext_control_t *control_ext; /* Control data path extension */
/* List of requests which are waiting for remote completion */
ucs_hlist_head_t proto_reqs;
} ucp_ep_ext_gen_t;


Expand Down Expand Up @@ -660,4 +662,13 @@ ucs_status_t ucp_ep_do_uct_ep_keepalive(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
*/
void ucp_ep_do_keepalive(ucp_ep_h ep, ucp_lane_map_t *lane_map);

/**
* @brief Purge flush and protocol requests scheduled on a given UCP endpoint.
*
* @param [in] ucp_ep Endpoint object on which requests should be
* purged.
* @param [in] status Completion status.
*/
void ucp_ep_reqs_purge(ucp_ep_h ucp_ep, ucs_status_t status);

#endif
31 changes: 15 additions & 16 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ struct ucp_request {
void *buffer; /* Send buffer */
ucp_request_callback_t flushed_cb; /* Called when flushed */
};
ucp_datatype_t datatype; /* Send type */
size_t length; /* Total length, in bytes */
ucp_send_nbx_callback_t cb; /* Completion callback */
ucp_datatype_t datatype; /* Send type */
size_t length; /* Total length, in bytes */
ucp_send_nbx_callback_t cb; /* Completion callback */
ucs_hlist_link_t list; /* Element in the per-EP list of UCP
flush/proto requests */

const ucp_proto_config_t *proto_config; /* Selected protocol for the request */

Expand Down Expand Up @@ -178,9 +180,8 @@ struct ucp_request {
} msg_proto;

struct {
ucs_ptr_map_key_t sreq_id; /* Send request ID */
uint64_t remote_addr; /* Remote address */
ucp_rkey_h rkey; /* Remote memory key */
uint64_t remote_addr; /* Remote address */
ucp_rkey_h rkey; /* Remote memory key */
} rma;

struct {
Expand Down Expand Up @@ -241,16 +242,14 @@ struct ucp_request {
} rndv_rtr;

struct {
ucs_hlist_link_t list_elem; /* Element in the per-EP list of UCP
flush requests */
unsigned uct_flags; /* Flags to pass to @ref uct_ep_flush */
uct_worker_cb_id_t prog_id; /* Progress callback ID */
uint32_t cmpl_sn; /* Sequence number of the remote completion
this request is waiting for */
uint8_t sw_started;
uint8_t sw_done;
uint8_t num_lanes; /* How many lanes are being flushed */
ucp_lane_map_t started_lanes;/* Which lanes need were flushed */
unsigned uct_flags; /* Flags to pass to @ref uct_ep_flush */
uct_worker_cb_id_t prog_id; /* Progress callback ID */
uint32_t cmpl_sn; /* Sequence number of the remote completion
this request is waiting for */
uint8_t sw_started;
uint8_t sw_done;
uint8_t num_lanes; /* How many lanes are being flushed */
ucp_lane_map_t started_lanes; /* Which lanes need were flushed */
} flush;

struct {
Expand Down
59 changes: 58 additions & 1 deletion src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,31 @@ ucp_request_can_complete_stream_recv(ucp_request_t *req)
return 1;
}


static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_request_mem_alloc(const char *name)
{
ucp_request_t *req = (ucp_request_t*)ucs_malloc(sizeof(*req), name);

if (ucs_unlikely(req == NULL)) {
return NULL;
}

ucs_trace_req("allocated request %p (%s)", req, name);
ucp_request_id_reset(req);
UCS_PROFILE_REQUEST_NEW(req, "ucp_request", 0);

return req;
}

static UCS_F_ALWAYS_INLINE void ucp_request_mem_free(ucp_request_t *req)
{
UCS_PROFILE_REQUEST_FREE(req);
ucp_request_id_check(req, ==, UCP_REQUEST_ID_INVALID);
ucs_trace_req("freed request %p", req);
ucs_free(req);
}

/*
* @return Whether completed.
* *req_status if filled with the completion status if completed.
Expand Down Expand Up @@ -816,6 +841,13 @@ static UCS_F_ALWAYS_INLINE void ucp_request_id_alloc(ucp_request_t *req)
{
ucp_request_id_check(req, ==, UCP_REQUEST_ID_INVALID);
ucp_ep_ptr_id_alloc(req->send.ep, req, &req->id);

/* TODO: combine checks for err_mode and PTR MAP indirect flag */
if (ucs_unlikely(ucp_ep_config(req->send.ep)->key.err_mode ==
UCP_ERR_HANDLING_MODE_PEER)) {
ucs_hlist_add_tail(&ucp_ep_ext_gen(req->send.ep)->proto_reqs,
&req->send.list);
}
}

static UCS_F_ALWAYS_INLINE ucs_ptr_map_key_t
Expand All @@ -825,6 +857,18 @@ ucp_request_get_id(const ucp_request_t *req)
return req->id;
}

static UCS_F_ALWAYS_INLINE void ucp_request_ep_list_del(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;

ucs_assert(ep != NULL);
/* TODO: combine checks for err_mode and PTR MAP indirect flag */
if (ucs_unlikely(ucp_ep_config(ep)->key.err_mode ==
UCP_ERR_HANDLING_MODE_PEER)) {
ucs_hlist_del(&ucp_ep_ext_gen(ep)->proto_reqs, &req->send.list);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_request_id_release(ucp_request_t *req)
{
Expand All @@ -835,6 +879,7 @@ ucp_request_id_release(ucp_request_t *req)
status = ucs_ptr_map_del(&worker->ptr_map, req->id);
ucs_assertv(status == UCS_OK, "req %p: failed to release id", req);
ucp_request_id_reset(req);
ucp_request_ep_list_del(req);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
Expand All @@ -844,19 +889,21 @@ ucp_request_get_by_id(ucp_worker_h worker, ucs_ptr_map_key_t id,
ucs_status_t status;
void *ptr;

ucs_assert(id != UCP_REQUEST_ID_INVALID);

status = ucs_ptr_map_get(&worker->ptr_map, id, extract, &ptr);
if (ucs_unlikely(status != UCS_OK)) {
return status;
}

*req_p = (ucp_request_t*)ptr;
ucp_request_id_check(*req_p, ==, id);

if (extract) {
/* If request ID was released, then need to reset the request ID to use
* the value for checking whether the request ID should be put to PTR
* map or not in case of error handling */
ucp_request_id_reset(*req_p);
ucp_request_ep_list_del(*req_p);
}

return status;
Expand Down Expand Up @@ -895,6 +942,16 @@ ucp_request_invoke_uct_completion_success(ucp_request_t *req)
return UCS_OK;
}

/* The function can be used to complete any UCP send request */
static UCS_F_ALWAYS_INLINE void
brminich marked this conversation as resolved.
Show resolved Hide resolved
ucp_request_complete_and_dereg_send(ucp_request_t *sreq, ucs_status_t status)
{
ucs_assert(!sreq->send.ep->worker->context->config.ext.proto_enable);
ucp_request_send_generic_dt_finish(sreq);
ucp_request_send_buffer_dereg(sreq);
ucp_request_complete_send(sreq, status);
}


#define UCP_REQUEST_GET_BY_ID(_req_p, _worker, _req_id, _extract, \
_action, _fmt_str, ...) \
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg)
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_FAILED);

ucp_ep_discard_lanes(ucp_ep, status);
ucp_ep_reqs_purge(ucp_ep, status);
ucp_stream_ep_cleanup(ucp_ep);

if (ucp_ep->flags & UCP_EP_FLAG_USED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSED);
Expand Down
7 changes: 4 additions & 3 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
ucs_trace_req("flush request %p remote completions done", req);
} else {
req->send.flush.cmpl_sn = flush_state->send_sn;
ucs_hlist_add_tail(&flush_state->reqs, &req->send.flush.list_elem);
ucs_trace_req("added flush request %p to ep remote completion queue"
" with sn %d", req, req->send.flush.cmpl_sn);
ucs_hlist_add_tail(&flush_state->reqs, &req->send.list);
ucs_trace_req("added flush request %p to ep remote completion"
" queue with sn %d",
req, req->send.flush.cmpl_sn);
}
}
req->send.flush.sw_started = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/rma/rma.inl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static inline void ucp_ep_rma_remote_request_completed(ucp_ep_t *ep)
ucp_worker_flush_ops_count_dec(ep->worker);
++flush_state->cmpl_sn;

ucs_hlist_for_each_extract_if(req, &flush_state->reqs, send.flush.list_elem,
ucs_hlist_for_each_extract_if(req, &flush_state->reqs, send.list,
UCS_CIRCULAR_COMPARE32(
req->send.flush.cmpl_sn, <=,
flush_state->cmpl_sn)) {
Expand Down
15 changes: 4 additions & 11 deletions src/ucp/rndv/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,6 @@ ucp_rndv_adjust_zcopy_length(size_t min_zcopy, size_t max_zcopy, size_t align,
return result_length;
}

static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
{
ucs_assert(!sreq->send.ep->worker->context->config.ext.proto_enable);
ucp_request_send_generic_dt_finish(sreq);
ucp_request_send_buffer_dereg(sreq);
ucp_request_complete_send(sreq, status);
}

void ucp_rndv_req_send_ack(ucp_request_t *ack_req, ucp_request_t *req,
ucs_ptr_map_key_t remote_req_id, ucs_status_t status,
ucp_am_id_t am_id, const char *ack_str)
Expand Down Expand Up @@ -1478,7 +1470,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,
if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_tag_offload_cancel_rndv(sreq);
}
ucp_rndv_complete_send(sreq, rep_hdr->status);

ucp_request_complete_and_dereg_send(sreq, rep_hdr->status);
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
return UCS_OK;
}

Expand All @@ -1495,7 +1488,7 @@ ucs_status_t ucp_rndv_rts_handle_status_from_pending(ucp_request_t *sreq,
}

ucp_request_id_release(sreq);
ucp_rndv_complete_send(sreq, status);
ucp_request_complete_and_dereg_send(sreq, status);
yosefe marked this conversation as resolved.
Show resolved Hide resolved
}

return UCS_OK;
Expand Down Expand Up @@ -1549,7 +1542,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_am_bcopy, (self),
return UCS_ERR_NO_RESOURCE;
}

ucp_rndv_complete_send(sreq, status);
ucp_request_complete_and_dereg_send(sreq, status);

return UCS_OK;
}
Expand Down
7 changes: 4 additions & 3 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ ucs_status_t ucp_wireup_msg_progress(uct_pending_req_t *self)

out_free_req:
ucs_free(req->send.buffer);
ucs_free(req);
ucp_request_mem_free(req);
out:
UCS_ASYNC_UNBLOCK(&ep->worker->async);
return status;
Expand Down Expand Up @@ -224,7 +224,7 @@ ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type, const ucp_tl_bitmap_t *tl_bitmap,
/* We cannot allocate from memory pool because it's not thread safe
* and this function may be called from any thread
*/
req = ucs_malloc(sizeof(*req), "wireup_msg_req");
req = ucp_request_mem_alloc("wireup_msg_req");
if (req == NULL) {
ucs_error("failed to allocate request for sending WIREUP message");
return UCS_ERR_NO_MEMORY;
Expand All @@ -240,7 +240,7 @@ ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type, const ucp_tl_bitmap_t *tl_bitmap,
&req->send.wireup, &req->send.buffer,
&req->send.length);
if (status != UCS_OK) {
ucs_free(req);
ucp_request_mem_free(req);
return status;
}

Expand Down Expand Up @@ -704,6 +704,7 @@ ucp_wireup_send_ep_removed(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
}

ucp_ep_update_remote_id(reply_ep, msg->src_ep_id);
ucp_ep_flush_state_reset(reply_ep);
status = ucp_wireup_msg_send(reply_ep, UCP_WIREUP_MSG_EP_REMOVED,
&ucp_tl_bitmap_min, NULL);
if (status != UCS_OK) {
Expand Down
Loading