Skip to content

Commit

Permalink
UCP/EP: Cleanup proto requests on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Oct 27, 2020
1 parent 5cdd2e1 commit 69d47f5
Show file tree
Hide file tree
Showing 20 changed files with 300 additions and 139 deletions.
1 change: 1 addition & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ noinst_HEADERS = \
rma/rma.h \
rma/rma.inl \
rndv/rndv.h \
rndv/rndv.inl \
tag/eager.h \
tag/tag_rndv.h \
tag/tag_match.h \
Expand Down
6 changes: 4 additions & 2 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ucp/core/ucp_worker.h>
#include <ucp/core/ucp_context.h>
#include <ucp/rndv/rndv.h>
#include <ucp/rndv/rndv.inl>
#include <ucp/proto/proto_am.inl>
#include <ucp/dt/dt.h>
#include <ucp/dt/dt.inl>
Expand Down Expand Up @@ -638,8 +639,9 @@ size_t ucp_am_rndv_rts_pack(void *dest, void *arg)
UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_am_rndv_rts, (self),
uct_pending_req_t *self)
{
return ucp_do_am_bcopy_single(self, UCP_AM_ID_RNDV_RTS,
ucp_am_rndv_rts_pack);
ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct);
return ucp_rndv_send_rts(sreq, ucp_am_rndv_rts_pack,
sizeof(ucp_am_rndv_rts_hdr_t));
}

static ucs_status_t ucp_am_send_start_rndv(ucp_request_t *sreq)
Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "ucp_context.h"
#include "ucp_request.h"
#include "ucp_worker.h"

#include <ucs/config/parser.h>
#include <ucs/algorithm/crc.h>
Expand Down
104 changes: 75 additions & 29 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,31 @@ ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
goto err;
}

ep->cfg_index = UCP_WORKER_CFG_INDEX_NULL;
ep->worker = worker;
ep->am_lane = UCP_NULL_LANE;
ep->flags = 0;
ep->conn_sn = UCP_EP_MATCH_CONN_SN_MAX;
ucp_ep_ext_gen(ep)->user_data = NULL;
ucp_ep_ext_gen(ep)->err_cb = NULL;
ucp_ep_ext_gen(ep)->ids = ucs_malloc(sizeof(ucp_ep_ids_t),
"ep_ids");
if (ucp_ep_ext_gen(ep)->ids == NULL) {
ucs_error("Failed to allocate ep keys");
ep->cfg_index = UCP_WORKER_CFG_INDEX_NULL;
ep->worker = worker;
ep->am_lane = UCP_NULL_LANE;
ep->flags = 0;
ep->conn_sn = UCP_EP_MATCH_CONN_SN_MAX;
ucp_ep_ext_gen(ep)->user_data = NULL;
ucp_ep_ext_gen(ep)->control_ext = ucs_malloc(sizeof(*(ucp_ep_ext_gen(ep)->
control_ext)),
"ep_control_ext");
if (ucp_ep_ext_gen(ep)->control_ext == NULL) {
ucs_error("Failed to allocate ep control extension");
status = UCS_ERR_NO_MEMORY;
goto err_free_ep;
}

ucp_ep_ext_gen(ep)->ids->local = UCP_EP_ID_INVALID;
ucp_ep_ext_gen(ep)->ids->remote = UCP_EP_ID_INVALID;
ucp_ep_ext_control(ep)->err_cb = NULL;
ucp_ep_ext_control(ep)->ids.local = UCP_EP_ID_INVALID;
ucp_ep_ext_control(ep)->ids.remote = UCP_EP_ID_INVALID;

ucs_hlist_head_init(&ucp_ep_ext_gen(ep)->proto_reqs);

UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen(ep)->ep_match) >=
sizeof(ucp_ep_ext_gen(ep)->listener));
UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen(ep)->ep_match) >=
sizeof(ucp_ep_ext_gen(ep)->flush_state));
UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_control(ep)->listener) ==
sizeof(ucp_ep_ext_control(ep)->close_req));
memset(&ucp_ep_ext_gen(ep)->ep_match, 0,
sizeof(ucp_ep_ext_gen(ep)->ep_match));

Expand All @@ -139,7 +142,7 @@ ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
status = UCS_STATS_NODE_ALLOC(&ep->stats, &ucp_ep_stats_class,
worker->stats, "-%p", ep);
if (status != UCS_OK) {
goto err_free_keys;
goto err_free_ep_control_ext;
}

ucs_list_head_init(&ucp_ep_ext_gen(ep)->ep_list);
Expand All @@ -148,8 +151,8 @@ ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
ucs_debug("created ep %p to %s %s", ep, ucp_ep_peer_name(ep), message);
return UCS_OK;

err_free_keys:
ucs_free(ucp_ep_ext_gen(ep)->ids);
err_free_ep_control_ext:
ucs_free(ucp_ep_ext_control(ep));
err_free_ep:
ucs_strided_alloc_put(&worker->ep_alloc, ep);
err:
Expand All @@ -159,7 +162,7 @@ ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
void ucp_ep_destroy_base(ucp_ep_h ep)
{
UCS_STATS_NODE_FREE(ep->stats);
ucs_free(ucp_ep_ext_gen(ep)->ids);
ucs_free(ucp_ep_ext_control(ep));
ucs_strided_alloc_put(&ep->worker->ep_alloc, ep);
}

Expand All @@ -184,7 +187,7 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,

status = ucs_ptr_map_put(&worker->ptr_map, ep,
!!(ep->flags & UCP_EP_FLAG_INDIRECT_ID),
&ucp_ep_ext_gen(ep)->ids->local);
&ucp_ep_ext_control(ep)->ids.local);
if (status != UCS_OK) {
goto err_destroy_ep_base;
}
Expand All @@ -211,7 +214,6 @@ void ucp_ep_delete(ucp_ep_h ep)
ucp_wireup_msg_ack_cb_pred, ep);
ucp_worker_keepalive_remove_ep(ep);
ucs_list_del(&ucp_ep_ext_gen(ep)->ep_list);
ucs_assert(ucp_ep_ext_gen(ep)->ids->local != UCP_EP_ID_INVALID);
status = ucs_ptr_map_del(&ep->worker->ptr_map, ucp_ep_local_id(ep));
if (status != UCS_OK) {
ucs_warn("ep %p local id 0x%"PRIxPTR": ucs_ptr_map_del failed with status %s",
Expand Down Expand Up @@ -286,8 +288,8 @@ ucp_ep_adjust_params(ucp_ep_h ep, const ucp_ep_params_t *params)
}

if (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLER) {
ucp_ep_ext_gen(ep)->user_data = params->err_handler.arg;
ucp_ep_ext_gen(ep)->err_cb = params->err_handler.cb;
ucp_ep_ext_gen(ep)->user_data = params->err_handler.arg;
ucp_ep_ext_control(ep)->err_cb = params->err_handler.cb;
}

if (params->field_mask & UCP_EP_PARAM_FIELD_USER_DATA) {
Expand Down Expand Up @@ -792,7 +794,7 @@ void ucp_ep_destroy_internal(ucp_ep_h ep)
ucp_ep_cleanup_lanes(ep);
if (ep->flags & UCP_EP_FLAG_TEMPORARY) {
/* it's failed tmp ep of main ep */
ucs_assert(ucp_ep_ext_gen(ep)->ids->local == UCP_EP_ID_INVALID);
ucs_assert(ucp_ep_ext_control(ep)->ids.local == UCP_EP_ID_INVALID);
ucp_ep_destroy_base(ep);
} else {
ucp_ep_delete(ep);
Expand Down Expand Up @@ -888,8 +890,8 @@ static void ucp_ep_set_close_request(ucp_ep_h ep, ucp_request_t *request,
ucs_trace("ep %p: setting close request %p, %s", ep, request, debug_msg);

ucp_ep_flush_state_invalidate(ep);
ucp_ep_ext_gen(ep)->close_req.req = request;
ep->flags |= UCP_EP_FLAG_CLOSE_REQ_VALID;
ucp_ep_ext_control(ep)->close_req.req = request;
ep->flags |= UCP_EP_FLAG_CLOSE_REQ_VALID;
}

static void ucp_ep_close_flushed_callback(ucp_request_t *req)
Expand Down Expand Up @@ -2260,18 +2262,18 @@ void ucp_ep_invoke_err_cb(ucp_ep_h ep, ucs_status_t status)
/* Do not invoke error handler if it's not enabled */
if ((ucp_ep_config(ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) ||
/* error callback is not set */
(ucp_ep_ext_gen(ep)->err_cb == NULL) ||
(ucp_ep_ext_control(ep)->err_cb == NULL) ||
/* the EP has been closed by user, or error callback already called */
(ep->flags & (UCP_EP_FLAG_CLOSED | UCP_EP_FLAG_ERR_HANDLER_INVOKED))) {
return;
}

ucs_assert(ep->flags & UCP_EP_FLAG_USED);
ucs_debug("ep %p: calling user error callback %p with arg %p and status %s",
ep, ucp_ep_ext_gen(ep)->err_cb, ucp_ep_ext_gen(ep)->user_data,
ep, ucp_ep_ext_control(ep)->err_cb, ucp_ep_ext_gen(ep)->user_data,
ucs_status_string(status));
ep->flags |= UCP_EP_FLAG_ERR_HANDLER_INVOKED;
ucp_ep_ext_gen(ep)->err_cb(ucp_ep_ext_gen(ep)->user_data, ep, status);
ucp_ep_ext_control(ep)->err_cb(ucp_ep_ext_gen(ep)->user_data, ep, status);
}

int ucp_ep_config_test_rndv_support(const ucp_ep_config_t *config)
Expand All @@ -2298,3 +2300,47 @@ void ucp_ep_do_keepalive(ucp_ep_h ep, ucp_lane_map_t *lane_map)
}
}
}

void ucp_ep_reqs_purge(ucp_ep_h ucp_ep, ucs_status_t status)
{
uct_ep_h wireup_ep;
ucp_request_t *req;

ucs_hlist_for_each_extract(req, &ucp_ep_ext_gen(ucp_ep)->proto_reqs,
list_elem) {
if (req->flags & (UCP_REQUEST_FLAG_SEND_AM |
UCP_REQUEST_FLAG_SEND_TAG)) {
if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_tag_offload_cancel_rndv(req);
}
ucp_request_complete_send_common(req, status);
} else if (req->flags & (UCP_REQUEST_FLAG_RECV_AM |
UCP_REQUEST_FLAG_RECV_TAG)) {
ucp_request_complete_recv_common(req, status);
} else {
ucs_assert(req->flags & UCP_REQUEST_FLAG_RNDV_FRAG);
ucp_request_put(req);
}
}

if (ucp_ep->flags & (UCP_EP_FLAG_ON_MATCH_CTX | UCP_EP_FLAG_LISTENER |
UCP_EP_FLAG_CLOSE_REQ_VALID)) {
/* flush state is not valid yet or already invalidated */
return;
}

wireup_ep = (!ucp_ep_has_cm_lane(ucp_ep)) ?
ucp_ep->uct_eps[ucp_ep_get_wireup_msg_lane(ucp_ep)] :
ucp_ep->uct_eps[ucp_ep_get_cm_lane(ucp_ep)];
if ((wireup_ep != NULL) && ucp_wireup_ep_test(wireup_ep)) {
/* flush state is not valid yet */
return;
}

ucs_hlist_for_each_extract(req, &ucp_ep_flush_state(ucp_ep)->reqs,
list_elem) {
ucp_ep_flush_request_ff(req, status);
}

ucp_ep_flush_state_invalidate(ucp_ep);
}
41 changes: 28 additions & 13 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ typedef struct ucp_ep {
* Status of protocol-level remote completions
*/
typedef struct {
ucs_queue_head_t reqs; /* Queue of flush requests which
ucs_hlist_head_t reqs; /* Queue of flush requests which
are waiting for remote completion */
uint32_t send_sn; /* Sequence number of sent operations */
uint32_t cmpl_sn; /* Sequence number of completions */
Expand All @@ -393,31 +393,44 @@ typedef struct {
} ucp_ep_ids_t;


/*
* Endpoint extension for generic non fast-path data
*/
/**
* Endpoint extension for control data path
*/
typedef struct {
ucp_ep_ids_t *ids; /* Local and remote IDS, TODO:
ucp_ep_ids_t ids; /* Local and remote IDS, TODO:
remove indirect pointer after
stride allocator improvement */
void *user_data; /* User data associated with ep */
ucs_list_link_t ep_list; /* List entry in worker's all eps list */
ucp_err_handler_cb_t err_cb; /* Error handler */

/* Endpoint match context and remote completion status are mutually exclusive,
* since remote completions are counted only after the endpoint is already
* matched to a remote peer.
/* Endpoint match context is mutually exclusive with:
* - close request, since enpoint is exposed to the user only after
* the endpoint is already matched to a remote peer.
* - listener, since they are not used simultaneously.
*/
union {
ucp_ep_match_elem_t ep_match; /* Matching with remote endpoints */
ucp_ep_flush_state_t flush_state; /* Remove completion status */
ucp_listener_h listener; /* Listener that may be associated with ep */
ucp_ep_close_proto_req_t close_req; /* Close protocol request */
};
} ucp_ep_ext_control_t;


/**
* Endpoint extension for generic non fast-path data
*/
typedef struct {
void *user_data; /* User data associated with ep */
ucs_list_link_t ep_list; /* List entry in worker's all eps list */
ucs_hlist_head_t proto_reqs; /* List of requests which are waiting
for remote completion */
union {
ucp_ep_match_elem_t ep_match; /* Matching with remote endpoints */
ucp_ep_flush_state_t flush_state; /* Remote completion status */
};
ucp_ep_ext_control_t *control_ext; /* Control data path extension */
} ucp_ep_ext_gen_t;


/*
/**
* Endpoint extension for specific protocols
*/
typedef struct {
Expand Down Expand Up @@ -604,4 +617,6 @@ void ucp_ep_flush_request_ff(ucp_request_t *req, ucs_status_t status);
*/
void ucp_ep_do_keepalive(ucp_ep_h ep, ucp_lane_map_t *lane_map);

void ucp_ep_reqs_purge(ucp_ep_h ucp_ep, ucs_status_t status);

#endif
28 changes: 17 additions & 11 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ static UCS_F_ALWAYS_INLINE ucp_ep_flush_state_t* ucp_ep_flush_state(ucp_ep_h ep)
return &ucp_ep_ext_gen(ep)->flush_state;
}

static UCS_F_ALWAYS_INLINE ucp_ep_ext_control_t* ucp_ep_ext_control(ucp_ep_h ep)
{
ucs_assert(ucp_ep_ext_gen(ep)->control_ext != NULL);
return ucp_ep_ext_gen(ep)->control_ext;
}

static UCS_F_ALWAYS_INLINE ucs_ptr_map_key_t ucp_ep_remote_id(ucp_ep_h ep)
{
#if UCS_ENABLE_ASSERT
Expand All @@ -158,13 +164,13 @@ static UCS_F_ALWAYS_INLINE ucs_ptr_map_key_t ucp_ep_remote_id(ucp_ep_h ep)
return UCP_EP_ID_INVALID;
}
#endif
return ucp_ep_ext_gen(ep)->ids->remote;
return ucp_ep_ext_control(ep)->ids.remote;
}

static UCS_F_ALWAYS_INLINE ucs_ptr_map_key_t ucp_ep_local_id(ucp_ep_h ep)
{
ucs_assert(ucp_ep_ext_gen(ep)->ids->local != UCP_EP_ID_INVALID);
return ucp_ep_ext_gen(ep)->ids->local;
ucs_assert(ucp_ep_ext_control(ep)->ids.local != UCP_EP_ID_INVALID);
return ucp_ep_ext_control(ep)->ids.local;
}

/*
Expand All @@ -185,15 +191,15 @@ static inline void ucp_ep_update_remote_id(ucp_ep_h ep,
ucs_ptr_map_key_t remote_id)
{
if (ep->flags & UCP_EP_FLAG_REMOTE_ID) {
ucs_assertv(remote_id == ucp_ep_ext_gen(ep)->ids->remote,
ucs_assertv(remote_id == ucp_ep_ext_control(ep)->ids.remote,
"ep=%p rkey=0x%" PRIxPTR " ep->remote_id=0x%" PRIxPTR,
ep, remote_id, ucp_ep_ext_gen(ep)->ids->remote);
ep, remote_id, ucp_ep_ext_control(ep)->ids.remote);
}

ucs_assert(remote_id != UCP_EP_ID_INVALID);
ucs_trace("ep %p: set remote_id to 0x%" PRIxPTR, ep, remote_id);
ep->flags |= UCP_EP_FLAG_REMOTE_ID;
ucp_ep_ext_gen(ep)->ids->remote = remote_id;
ep->flags |= UCP_EP_FLAG_REMOTE_ID;
ucp_ep_ext_control(ep)->ids.remote = remote_id;
}

static inline const char* ucp_ep_peer_name(ucp_ep_h ep)
Expand All @@ -214,17 +220,17 @@ static inline void ucp_ep_flush_state_reset(ucp_ep_h ep)
ucs_assert(!(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID) ||
((flush_state->send_sn == 0) &&
(flush_state->cmpl_sn == 0) &&
ucs_queue_is_empty(&flush_state->reqs)));
ucs_hlist_is_empty(&flush_state->reqs)));

ucs_hlist_head_init(&flush_state->reqs);
flush_state->send_sn = 0;
flush_state->cmpl_sn = 0;
ucs_queue_head_init(&flush_state->reqs);
ep->flags |= UCP_EP_FLAG_FLUSH_STATE_VALID;
ep->flags |= UCP_EP_FLAG_FLUSH_STATE_VALID;
}

static inline void ucp_ep_flush_state_invalidate(ucp_ep_h ep)
{
ucs_assert(ucs_queue_is_empty(&ucp_ep_flush_state(ep)->reqs));
ucs_assert(ucs_hlist_is_empty(&ucp_ep_flush_state(ep)->reqs));
ep->flags &= ~UCP_EP_FLAG_FLUSH_STATE_VALID;
}

Expand Down
4 changes: 2 additions & 2 deletions src/ucp/core/ucp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
static unsigned ucp_listener_accept_cb_progress(void *arg)
{
ucp_ep_h ep = arg;
ucp_listener_h listener = ucp_ep_ext_gen(ep)->listener;
ucp_listener_h listener = ucp_ep_ext_control(ep)->listener;

/* NOTE: protect union */
ucs_assert(!(ep->flags & (UCP_EP_FLAG_ON_MATCH_CTX |
Expand Down Expand Up @@ -88,7 +88,7 @@ static unsigned ucp_listener_conn_request_progress(void *arg)
if (listener->accept_cb != NULL) {
if (ep->flags & UCP_EP_FLAG_LISTENER) {
ucs_assert(!(ep->flags & UCP_EP_FLAG_USED));
ucp_ep_ext_gen(ep)->listener = listener;
ucp_ep_ext_control(ep)->listener = listener;
} else {
ep->flags |= UCP_EP_FLAG_USED;
listener->accept_cb(ep, listener->arg);
Expand Down
Loading

0 comments on commit 69d47f5

Please sign in to comment.