Skip to content

Commit

Permalink
Merge pull request openucx#5795 from Artemy-Mellanox/topic/flush_canc…
Browse files Browse the repository at this point in the history
…el_nb-8

UCT: Flush TX
  • Loading branch information
yosefe authored Dec 1, 2020
2 parents c197746 + f52939a commit 0e0a00d
Show file tree
Hide file tree
Showing 27 changed files with 217 additions and 375 deletions.
32 changes: 26 additions & 6 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,14 @@ static unsigned ucp_worker_discard_uct_ep_destroy_progress(void *arg)
return 1;
}

static void ucp_worker_put_flush_req(ucp_request_t *req)
{
ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker;

ucp_worker_flush_ops_count_dec(worker);
ucp_request_put(req);
}

static void
ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self)
{
Expand All @@ -2161,6 +2169,12 @@ ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self)
ucp_trace_req(req, "discard_uct_ep flush completion status %s",
ucs_status_string(self->status));

if (self->status == UCS_ERR_CANCELED) {
/* we run from EP cleanup - just release request */
ucp_worker_put_flush_req(req);
return;
}

/* don't destroy UCT EP from the flush completion callback, schedule
* a progress callback on the main thread to destroy UCT EP */
uct_worker_progress_register_safe(worker->uct,
Expand Down Expand Up @@ -2221,18 +2235,24 @@ static unsigned ucp_worker_discard_uct_ep_progress(void *arg)
static int ucp_worker_discard_remove_filter(const ucs_callbackq_elem_t *elem,
void *arg)
{
return (elem->cb == ucp_worker_discard_uct_ep_progress) ||
(elem->cb == ucp_worker_discard_uct_ep_destroy_progress);
if ((elem->cb != ucp_worker_discard_uct_ep_destroy_progress) &&
(elem->cb != ucp_worker_discard_uct_ep_progress)) {
return 0;
}

ucp_worker_put_flush_req((ucp_request_t*)elem->arg);
return 1;
}

static void ucp_worker_discarded_uct_eps_cleanup(ucp_worker_h worker)
{
uct_ep_h uct_ep;
ucp_request_t *req;

kh_foreach(&worker->discard_uct_ep_hash, uct_ep, req, {
ucp_worker_flush_ops_count_dec(worker);
ucp_request_put(req);
/* if ep owns the discard operation ep_destroy will cancel it.
* we are after uct_worker_progress_unregister_safe and
* ucp_worker_discard_remove_filter, so either we canceled req
* or it was finished and removed from kh before */
kh_foreach_key(&worker->discard_uct_ep_hash, uct_ep, {
uct_ep_destroy(uct_ep);
})
}
Expand Down
103 changes: 7 additions & 96 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,106 +304,17 @@ ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags)
return UCS_OK;
}

static void uct_ep_failed_purge_cb(uct_pending_req_t *self, void *arg)
ucs_status_t uct_iface_handle_ep_err(uct_iface_h iface, uct_ep_h ep,
ucs_status_t status)
{
uct_pending_req_queue_push((ucs_queue_head_t*)arg, self);
}

static void uct_ep_failed_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
void *arg)
{
uct_failed_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_failed_iface_t);
uct_pending_req_t *req;

ucs_queue_for_each_extract(req, &iface->pend_q, priv, 1) {
if (cb != NULL) {
cb(req, arg);
} else {
ucs_warn("ep=%p cancelling user pending request %p", tl_ep, req);
}
}
}

static void uct_ep_failed_destroy(uct_ep_h tl_ep)
{
/* Warn user if some pending reqs left*/
uct_ep_failed_purge (tl_ep, NULL, NULL);
uct_base_iface_t *base_iface = ucs_derived_of(iface, uct_base_iface_t);

ucs_free(tl_ep->iface);
ucs_free(tl_ep);
}

ucs_status_t uct_set_ep_failed(ucs_class_t *cls, uct_ep_h tl_ep,
uct_iface_h tl_iface, ucs_status_t status)
{
uct_failed_iface_t *f_iface;
uct_iface_ops_t *ops;
uct_base_iface_t *iface = ucs_derived_of(tl_iface, uct_base_iface_t);

ucs_debug("set ep %p to failed state", tl_ep);

/* TBD: consider allocating one instance per interface
* rather than for each endpoint */
f_iface = ucs_malloc(sizeof(*f_iface), "failed iface");
if (f_iface == NULL) {
ucs_error("Could not create failed iface (nomem)");
return status;
}

ucs_queue_head_init(&f_iface->pend_q);
ops = &f_iface->super.ops;

/* Move all pending requests to the queue.
* Failed ep will use that queue for purge. */
uct_ep_pending_purge(tl_ep, uct_ep_failed_purge_cb, &f_iface->pend_q);

ops->ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_bc_ep_timeout;
ops->ep_put_zcopy = (uct_ep_put_zcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_get_short = (uct_ep_get_short_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_get_zcopy = (uct_ep_get_zcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_am_bcopy = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_bc_ep_timeout;
ops->ep_am_zcopy = (uct_ep_am_zcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_tag_eager_short = (uct_ep_tag_eager_short_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_tag_eager_bcopy = (uct_ep_tag_eager_bcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_tag_eager_zcopy = (uct_ep_tag_eager_zcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_tag_rndv_zcopy = (uct_ep_tag_rndv_zcopy_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_tag_rndv_cancel = (uct_ep_tag_rndv_cancel_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_tag_rndv_request = (uct_ep_tag_rndv_request_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy;
ops->ep_pending_purge = uct_ep_failed_purge;
ops->ep_flush = (uct_ep_flush_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_fence = (uct_ep_fence_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_check = (uct_ep_check_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_connect_to_ep = (uct_ep_connect_to_ep_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_destroy = uct_ep_failed_destroy;
ops->ep_get_address = (uct_ep_get_address_func_t)ucs_empty_function_return_ep_timeout;

ucs_class_call_cleanup_chain(cls, tl_ep, -1);

tl_ep->iface = &f_iface->super;

if (iface->err_handler) {
return iface->err_handler(iface->err_handler_arg, tl_ep, status);
} else if (status == UCS_ERR_CANCELED) {
ucs_debug("error %s was suppressed for ep %p",
ucs_status_string(UCS_ERR_CANCELED), tl_ep);
/* Suppress this since the cancellation is initiated by user. */
status = UCS_OK;
} else {
ucs_debug("error %s was not handled for ep %p",
ucs_status_string(status), tl_ep);
if (base_iface->err_handler) {
return base_iface->err_handler(base_iface->err_handler_arg, ep, status);
}

ucs_assert(status != UCS_ERR_CANCELED);
ucs_debug("error %s was not handled for ep %p", ucs_status_string(status), ep);
return status;
}

Expand Down
4 changes: 2 additions & 2 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ void uct_iface_set_async_event_params(const uct_iface_params_t *params,
uct_async_event_cb_t *event_cb,
void **event_arg);

ucs_status_t uct_set_ep_failed(ucs_class_t* cls, uct_ep_h tl_ep, uct_iface_h
tl_iface, ucs_status_t status);
ucs_status_t uct_iface_handle_ep_err(uct_iface_h iface, uct_ep_h ep,
ucs_status_t status);

void uct_base_iface_query(uct_base_iface_t *iface, uct_iface_attr_t *iface_attr);

Expand Down
1 change: 0 additions & 1 deletion src/uct/ib/base/ib_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ struct uct_ib_iface_ops {
uct_ib_iface_arm_cq_func_t arm_cq;
uct_ib_iface_event_cq_func_t event_cq;
uct_ib_iface_handle_failure_func_t handle_failure;
uct_ib_iface_set_ep_failed_func_t set_ep_failed;
};


Expand Down
23 changes: 8 additions & 15 deletions src/uct/ib/dc/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,6 @@ static void uct_dc_mlx5_iface_progress_enable(uct_iface_h tl_iface, unsigned fla
uct_base_iface_progress_enable_cb(&iface->super.super, iface->progress, flags);
}

static ucs_status_t uct_dc_mlx5_ep_set_failed(uct_ib_iface_t *ib_iface,
uct_ep_h ep, ucs_status_t status)
{
return uct_set_ep_failed(&UCS_CLASS_NAME(uct_dc_mlx5_ep_t), ep,
&ib_iface->super.super, status);
}

static UCS_F_ALWAYS_INLINE unsigned
uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface)
{
Expand Down Expand Up @@ -1133,7 +1126,6 @@ static uct_rc_iface_ops_t uct_dc_mlx5_iface_ops = {
.arm_cq = uct_rc_mlx5_iface_common_arm_cq,
.event_cq = uct_rc_mlx5_iface_common_event_cq,
.handle_failure = uct_dc_mlx5_iface_handle_failure,
.set_ep_failed = uct_dc_mlx5_ep_set_failed,
},
.init_rx = uct_dc_mlx5_init_rx,
.cleanup_rx = uct_dc_mlx5_cleanup_rx,
Expand Down Expand Up @@ -1410,13 +1402,14 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface,
ucs_status_t status;
ucs_log_level_t log_lvl;

status = ib_iface->ops->set_ep_failed(ib_iface, &ep->super.super,
ep_status);
log_lvl = uct_ib_iface_failure_log_level(ib_iface, status, ep_status);

if (ep_status != UCS_ERR_CANCELED) {
uct_ib_mlx5_completion_with_err(ib_iface, (uct_ib_mlx5_err_cqe_t*)cqe,
txwq, log_lvl);
if (ep_status == UCS_ERR_CANCELED) {
return;
}

status = uct_iface_handle_ep_err(&ib_iface->super.super,
&ep->super.super, ep_status);
log_lvl = uct_ib_iface_failure_log_level(ib_iface, status, ep_status);
uct_ib_mlx5_completion_with_err(ib_iface, (uct_ib_mlx5_err_cqe_t*)cqe,
txwq, log_lvl);
}

2 changes: 1 addition & 1 deletion src/uct/ib/mlx5/ib_mlx5_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ ucs_status_t uct_ib_mlx5_completion_with_err(uct_ib_iface_t *iface,

if (ecqe->syndrome == MLX5_CQE_SYNDROME_WR_FLUSH_ERR) {
ucs_trace("QP 0x%x wqe[%d] is flushed", qp_num, wqe_index);
return status;
return UCS_ERR_CANCELED;
}

switch (ecqe->syndrome) {
Expand Down
6 changes: 0 additions & 6 deletions src/uct/ib/rc/accel/rc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,6 @@ ucs_status_t uct_rc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep, uct_tag_t tag,

ucs_status_t uct_rc_mlx5_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr);

ucs_status_t uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t *ep,
ucs_status_t status, uint16_t sn);

ucs_status_t uct_rc_mlx5_ep_set_failed(uct_ib_iface_t *iface, uct_ep_h ep,
ucs_status_t status);

void uct_rc_mlx5_ep_cleanup_qp(uct_ib_async_event_wait_t *wait_ctx);

#endif
39 changes: 9 additions & 30 deletions src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,11 @@ ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.md,
uct_ib_mlx5_md_t);
ucs_status_t status;
uint16_t sn;

if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
uct_ep_pending_purge(&ep->super.super.super, NULL, 0);
uct_rc_mlx5_ep_handle_failure(ep, UCS_ERR_CANCELED, ep->tx.wq.sw_pi);
return UCS_OK;
}

status = uct_rc_ep_flush(&ep->super, ep->tx.wq.bb_max, flags);
if (status != UCS_INPROGRESS) {
return status;
Expand All @@ -589,6 +585,13 @@ ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
sn = ep->tx.wq.sig_pi;
}

if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
status = uct_ib_mlx5_modify_qp_state(md, &ep->tx.wq.super, IBV_QPS_ERR);
if (status != UCS_OK) {
return status;
}
}

return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super,
&ep->super.txqp, comp, sn);
}
Expand Down Expand Up @@ -1013,7 +1016,6 @@ UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t)
ep_cleanup_ctx->qp = self->tx.wq.super;
ep_cleanup_ctx->reg = self->tx.wq.reg;

/* TODO should be removed by flush */
uct_rc_txqp_purge_outstanding(&iface->super, &self->super.txqp,
UCS_ERR_CANCELED, self->tx.wq.sw_pi, 1);
#if IBV_HW_TM
Expand All @@ -1028,29 +1030,6 @@ UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t)
self->tx.wq.super.qp_num);
}

ucs_status_t uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t *ep,
ucs_status_t status, uint16_t pi)
{
uct_rc_iface_t *rc_iface = ucs_derived_of(ep->super.super.super.iface,
uct_rc_iface_t);

uct_rc_txqp_purge_outstanding(rc_iface, &ep->super.txqp, status, pi, 0);
/* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble
is not updated for the error cqe and all outstanding wqes*/
rc_iface->tx.cq_available += ep->tx.wq.bb_max -
uct_rc_txqp_available(&ep->super.txqp);
return rc_iface->super.ops->set_ep_failed(&rc_iface->super,
&ep->super.super.super,
status);
}

ucs_status_t uct_rc_mlx5_ep_set_failed(uct_ib_iface_t *iface, uct_ep_h ep,
ucs_status_t status)
{
return uct_set_ep_failed(&UCS_CLASS_NAME(uct_rc_mlx5_ep_t), ep,
&iface->super.super, status);
}

UCS_CLASS_DEFINE(uct_rc_mlx5_ep_t, uct_rc_ep_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_mlx5_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_rc_mlx5_ep_t, uct_ep_t);
Loading

0 comments on commit 0e0a00d

Please sign in to comment.