diff --git a/src/uct/ib/Makefile.am b/src/uct/ib/Makefile.am index 61f2cdd08e6..c7a93fdef19 100644 --- a/src/uct/ib/Makefile.am +++ b/src/uct/ib/Makefile.am @@ -101,6 +101,7 @@ endif # HAVE_TL_RC if HAVE_TL_DC noinst_HEADERS += \ dc/dc_mlx5_ep.h \ + dc/dc_mlx5.inl \ dc/dc_mlx5.h libuct_ib_la_SOURCES += \ diff --git a/src/uct/ib/dc/dc_mlx5.c b/src/uct/ib/dc/dc_mlx5.c index eed7997c1ca..ed39605ca32 100644 --- a/src/uct/ib/dc/dc_mlx5.c +++ b/src/uct/ib/dc/dc_mlx5.c @@ -8,11 +8,11 @@ # include "config.h" #endif +#include "dc_mlx5.inl" #include "dc_mlx5.h" #include "dc_mlx5_ep.h" #include -#include #include #include #include @@ -226,11 +226,7 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface) iface, dci, qp_num, txqp, hw_ci); uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci); - - uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci)); - ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max); - - uct_rc_iface_update_reads(&iface->super.super); + uct_dc_mlx5_update_tx_res(iface, txwq, txqp, hw_ci); /** * Note: DCI is released after handling completion callbacks, @@ -1327,7 +1323,10 @@ uct_dc_mlx5_dci_keepalive_handle_failure(uct_dc_mlx5_iface_t *iface, ucs_mpool_put(op); reset_dci: - uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status); + uct_rc_txqp_available_set(txqp, iface->super.super.config.tx_qp_len); + uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, + txwq->sw_pi, 0); + uct_dc_mlx5_iface_reset_dci(iface, dci); } ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface) @@ -1349,21 +1348,15 @@ ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface) return UCS_OK; } -void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci, - ucs_status_t ep_status) +void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci) { - uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md, - uct_ib_mlx5_md_t); - uct_ib_mlx5_txwq_t *txwq; - uct_rc_txqp_t *txqp; + uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md, + uct_ib_mlx5_md_t); + uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq; ucs_status_t status; ucs_debug("iface %p reset dci[%d]", iface, dci); - UCT_DC_MLX5_IFACE_TXQP_DCI_GET(iface, dci, txqp, txwq); - uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len); - uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, - txwq->sw_pi, 0); ucs_assert(txwq->super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS); /* Synchronize CQ index with the driver, since it would remove pending @@ -1402,14 +1395,28 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface, ucs_status_t status; ucs_log_level_t log_lvl; + if (ep->flags & (UCT_DC_MLX5_EP_FLAG_ERR_HANDLER_INVOKED | + UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL)) { + return; + } + if (ep_status == UCS_ERR_CANCELED) { return; } + if (ep == iface->tx.fc_ep) { + /* Do not report errors on flow control endpoint */ + ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface, + ucs_status_string(ep_status)); + return; + } + status = uct_iface_handle_ep_err(&ib_iface->super.super, &ep->super.super, ep_status); log_lvl = uct_ib_iface_failure_log_level(ib_iface, status, ep_status); uct_ib_mlx5_completion_with_err(ib_iface, (uct_ib_mlx5_err_cqe_t*)cqe, txwq, log_lvl); + + ep->flags |= UCT_DC_MLX5_EP_FLAG_ERR_HANDLER_INVOKED; } diff --git a/src/uct/ib/dc/dc_mlx5.h b/src/uct/ib/dc/dc_mlx5.h index a0d1d23793b..8ba435a038e 100644 --- a/src/uct/ib/dc/dc_mlx5.h +++ b/src/uct/ib/dc/dc_mlx5.h @@ -257,9 +257,7 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface, uct_ib_mlx5_txwq_t *txwq, ucs_status_t ep_status); -void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, - uint8_t dci, - ucs_status_t ep_status); +void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci); #if HAVE_DEVX diff --git a/src/uct/ib/dc/dc_mlx5.inl b/src/uct/ib/dc/dc_mlx5.inl new file mode 100644 index 00000000000..3d670e9f8c6 --- /dev/null +++ b/src/uct/ib/dc/dc_mlx5.inl @@ -0,0 +1,22 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED. +* +* See file LICENSE for terms. +*/ + +#include "dc_mlx5.h" + +#include +#include "uct/ib/rc/base/rc_iface.h" +#include "uct/ib/rc/base/rc_ep.h" + + +static UCS_F_ALWAYS_INLINE void +uct_dc_mlx5_update_tx_res(uct_dc_mlx5_iface_t *iface, uct_ib_mlx5_txwq_t *txwq, + uct_rc_txqp_t *txqp, uint16_t hw_ci) +{ + uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci)); + ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max); + + uct_rc_iface_update_reads(&iface->super.super); +} diff --git a/src/uct/ib/dc/dc_mlx5_ep.c b/src/uct/ib/dc/dc_mlx5_ep.c index 622aa65e948..3e9b747aeeb 100644 --- a/src/uct/ib/dc/dc_mlx5_ep.c +++ b/src/uct/ib/dc/dc_mlx5_ep.c @@ -8,10 +8,10 @@ # include "config.h" #endif +#include "dc_mlx5.inl" #include "dc_mlx5_ep.h" #include "dc_mlx5.h" -#include #include #define UCT_DC_MLX5_IFACE_TXQP_GET(_iface, _ep, _txqp, _txwq) \ @@ -554,24 +554,11 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); + uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md, + uct_ib_mlx5_md_t); ucs_status_t status; UCT_DC_MLX5_TXQP_DECL(txqp, txwq); - if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) { - if (uct_dc_mlx5_iface_is_dci_rand(iface)) { - return UCS_ERR_UNSUPPORTED; - } - - uct_ep_pending_purge(tl_ep, NULL, 0); - if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) { - /* No dci -> no WQEs -> HW is clean, nothing to cancel */ - return UCS_OK; - } - - uct_dc_mlx5_ep_handle_failure(ep, NULL, UCS_ERR_CANCELED); - return UCS_OK; - } - if (!uct_dc_mlx5_iface_has_tx_resources(iface)) { return UCS_ERR_NO_RESOURCE; } @@ -600,6 +587,21 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq); + if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) { + ucs_assert(ucs_arbiter_group_is_empty(&ep->arb_group)); + ucs_assert(!(ep->flags & UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL)); + if (uct_dc_mlx5_iface_is_dci_rand(iface)) { + return UCS_ERR_UNSUPPORTED; + } + + status = uct_ib_mlx5_modify_qp_state(md, &txwq->super, IBV_QPS_ERR); + if (status != UCS_OK) { + return status; + } + + ep->flags |= UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL; + } + return uct_rc_txqp_add_flush_comp(&iface->super.super, &ep->super, txqp, comp, txwq->sig_pi); } @@ -826,6 +828,7 @@ ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, uct_dc_mlx5_iface_t); uct_ib_iface_t *ib_iface = &iface->super.super.super; struct ibv_ah_attr ah_attr = {.is_global = 0}; + uct_rc_iface_send_op_t *grant_ep; uct_dc_fc_sender_data_t sender; uct_dc_fc_request_t *dc_req; struct mlx5_wqe_av mlx5_av; @@ -892,9 +895,18 @@ ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, sender.global.gid = ib_iface->gid_info.gid; sender.global.is_global = dc_ep->flags & UCT_DC_MLX5_EP_FLAG_GRH; + grant_ep = ucs_mpool_get(&iface->super.super.tx.send_op_mp); + if (ucs_unlikely(grant_ep == NULL)) { + ucs_error("failed to allocate fc hard req grant op"); + return UCS_ERR_NO_MEMORY; + } + UCS_STATS_UPDATE_COUNTER(dc_ep->fc.stats, UCT_RC_FC_STAT_TX_HARD_REQ, 1); + uct_rc_ep_init_send_op(grant_ep, UCT_RC_IFACE_SEND_OP_FLAG_FC_GRANT, + NULL, (uct_rc_send_handler_t)ucs_mpool_put); + grant_ep->ep = tl_ep; uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI, txqp, txwq, MLX5_OPCODE_SEND_IMM, &sender.global, sizeof(sender.global), op, sender.ep, @@ -904,6 +916,7 @@ ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, uct_dc_mlx5_ep_get_grh(dc_ep), uct_ib_mlx5_wqe_av_size(&dc_ep->av), MLX5_WQE_CTRL_SOLICITED, INT_MAX); + uct_rc_txqp_add_send_op_sn(txqp, grant_ep, txwq->prev_sw_pi); } return UCS_OK; @@ -932,6 +945,28 @@ static void uct_dc_mlx5_ep_keepalive_cleanup(uct_dc_mlx5_ep_t *ep) } } +static void uct_dc_mlx5_txqp_purge_outstanding(uct_dc_mlx5_iface_t *iface, + uct_rc_txqp_t *txqp, + ucs_status_t status, + uint16_t sn, int warn) +{ + uct_rc_iface_send_op_t *op; + uct_dc_mlx5_ep_t *ep; + + ucs_queue_for_each_extract(op, &txqp->outstanding, queue, + UCS_CIRCULAR_COMPARE16(op->sn, <=, sn)) { + if (op->flags & UCT_RC_IFACE_SEND_OP_FLAG_FC_GRANT) { + /* grant operation aborted so need to clear "wait-for-grant" flag */ + ep = ucs_derived_of(op->ep, uct_dc_mlx5_ep_t); + uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep); + ucs_mpool_put(op); + } else { + uct_rc_txqp_purge_outstanding_op(&iface->super.super, op, status, + warn); + } + } +} + UCS_CLASS_INIT_FUNC(uct_dc_mlx5_ep_t, uct_dc_mlx5_iface_t *iface, const uct_dc_mlx5_iface_addr_t *if_addr, uct_ib_mlx5_base_av_t *av) @@ -975,10 +1010,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t) "iface (%p) ep (%p) dci leak detected: dci=%d", iface, self, self->dci); - /* TODO should be removed by flush */ - uct_rc_txqp_purge_outstanding(&iface->super.super, - &iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED, - iface->tx.dcis[self->dci].txwq.sw_pi, 1); + uct_dc_mlx5_txqp_purge_outstanding(iface, &iface->tx.dcis[self->dci].txqp, + UCS_ERR_CANCELED, + iface->tx.dcis[self->dci].txwq.sw_pi, 1); ucs_assert(ucs_queue_is_empty(&iface->tx.dcis[self->dci].txqp.outstanding)); iface->tx.dcis[self->dci].ep = NULL; } @@ -1310,34 +1344,25 @@ ucs_status_t uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_ void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg, ucs_status_t ep_status) { + struct mlx5_cqe64 *cqe = arg; uct_iface_h tl_iface = ep->super.super.iface; uint8_t dci = ep->dci; uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t); + uct_rc_txqp_t *txqp = &iface->tx.dcis[dci].txqp; uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq; + uint16_t pi = ntohs(cqe->wqe_counter); ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface)); - uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status); - /* since we removed all outstanding ops on the dci, it should be released */ + uct_dc_mlx5_update_tx_res(iface, txwq, txqp, pi); + uct_dc_mlx5_txqp_purge_outstanding(iface, txqp, ep_status, pi, 0); + ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI); uct_dc_mlx5_iface_dci_put(iface, dci); - ucs_assert_always(ep->dci == UCT_DC_MLX5_EP_NO_DCI); - - if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) { - /* No need to wait for grant on this ep anymore */ - uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep); - } + uct_dc_mlx5_iface_set_ep_failed(iface, ep, cqe, txwq, ep_status); - if (ep == iface->tx.fc_ep) { - ucs_assert(ep_status != UCS_ERR_CANCELED); - /* Cannot handle errors on flow-control endpoint. - * Or shall we ignore them? - */ - ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface, - ucs_status_string(ep_status)); - } else { - uct_dc_mlx5_iface_set_ep_failed(iface, ep, (struct mlx5_cqe64*)arg, - txwq, ep_status); + if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) { + uct_dc_mlx5_iface_reset_dci(iface, dci); } } diff --git a/src/uct/ib/dc/dc_mlx5_ep.h b/src/uct/ib/dc/dc_mlx5_ep.h index 4650df9df7e..4e17fdf51e1 100644 --- a/src/uct/ib/dc/dc_mlx5_ep.h +++ b/src/uct/ib/dc/dc_mlx5_ep.h @@ -17,22 +17,29 @@ enum uct_dc_mlx5_ep_flags { - UCT_DC_MLX5_EP_FLAG_TX_WAIT = UCS_BIT(0), /* ep is in the tx_wait state. See - description of the dcs+quota dci - selection policy above */ - UCT_DC_MLX5_EP_FLAG_GRH = UCS_BIT(1), /* ep has GRH address. Used by - dc_mlx5 endpoint */ - UCT_DC_MLX5_EP_FLAG_VALID = UCS_BIT(2), /* ep is a valid endpoint */ + UCT_DC_MLX5_EP_FLAG_TX_WAIT = UCS_BIT(0), /* ep is in the tx_wait state. See + description of the dcs+quota dci + selection policy above */ + UCT_DC_MLX5_EP_FLAG_GRH = UCS_BIT(1), /* ep has GRH address. Used by + dc_mlx5 endpoint */ + UCT_DC_MLX5_EP_FLAG_VALID = UCS_BIT(2), /* ep is a valid endpoint */ /* Indicates that FC grant has been requested, but is not received yet. * Flush will not complete until an outgoing grant request is acked. * It is needed to avoid the following cases: * 1) Grant arrives for the recently deleted ep. * 2) QP resources are available, but there are some pending requests. */ - UCT_DC_MLX5_EP_FLAG_FC_WAIT_FOR_GRANT = UCS_BIT(3), + UCT_DC_MLX5_EP_FLAG_FC_WAIT_FOR_GRANT = UCS_BIT(3), + /* Keepalive Request scheduled: indicates that keepalive request * is scheduled in outstanding queue and no more keepalive actions * are needed */ - UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4) + UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4), + + /* Flush cancel was executed on EP */ + UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL = UCS_BIT(5), + + /* Error handler already called or flush(CANCEL) disabled it */ + UCT_DC_MLX5_EP_FLAG_ERR_HANDLER_INVOKED = UCS_BIT(6), }; diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index ee3fccf0642..6081da04030 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -198,12 +198,12 @@ uct_rc_mlx5_iface_handle_failure(uct_ib_iface_t *ib_iface, void *arg, uct_rc_mlx5_common_update_tx_res(iface, &ep->tx.wq, &ep->super.txqp, pi); uct_rc_txqp_purge_outstanding(iface, &ep->super.txqp, ep_status, pi, 0); - if (ep->super.flags & (UCT_RC_EP_FLAG_NO_ERR_HANDLER | + if (ep->super.flags & (UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED | UCT_RC_EP_FLAG_FLUSH_CANCEL)) { return; } - ep->super.flags |= UCT_RC_EP_FLAG_NO_ERR_HANDLER; + ep->super.flags |= UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED; status = uct_iface_handle_ep_err(&iface->super.super.super, &ep->super.super.super, ep_status); diff --git a/src/uct/ib/rc/base/rc_ep.c b/src/uct/ib/rc/base/rc_ep.c index f3ec3c4a305..34df2c17f25 100644 --- a/src/uct/ib/rc/base/rc_ep.c +++ b/src/uct/ib/rc/base/rc_ep.c @@ -430,52 +430,61 @@ ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self) return status; } +void uct_rc_txqp_purge_outstanding_op(uct_rc_iface_t *iface, + uct_rc_iface_send_op_t *op, + ucs_status_t status, int warn) +{ + uct_rc_iface_send_desc_t *desc; + + if (op->handler != (uct_rc_send_handler_t)ucs_mpool_put) { + /* Allow clean flush cancel op from destroy flow */ + if (warn && (op->handler != uct_rc_ep_flush_op_completion_handler)) { + ucs_warn("destroying uncompleted operation %p handler %s", + op, ucs_debug_get_symbol_name(op->handler)); + } + + if (op->user_comp != NULL) { + /* This must be uct_rc_ep_get_bcopy_handler, + * uct_rc_ep_get_bcopy_handler_no_completion, + * uct_rc_ep_get_zcopy_completion_handler, + * uct_rc_ep_flush_op_completion_handler or + * one of the atomic handlers, + * so invoke user completion */ + uct_invoke_completion(op->user_comp, status); + } + + /* Need to release rdma_read resources taken by get operations */ + if ((op->handler == uct_rc_ep_get_bcopy_handler) || + (op->handler == uct_rc_ep_get_bcopy_handler_no_completion)) { + uct_rc_op_release_get_bcopy(op); + uct_rc_iface_update_reads(iface); + } else if (op->handler == uct_rc_ep_get_zcopy_completion_handler) { + uct_rc_op_release_get_zcopy(op); + uct_rc_iface_update_reads(iface); + } + } + + op->flags &= ~(UCT_RC_IFACE_SEND_OP_FLAG_INUSE | + UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY); + if ((op->handler == uct_rc_ep_send_op_completion_handler) || + (op->handler == uct_rc_ep_get_zcopy_completion_handler)) { + uct_rc_iface_put_send_op(op); + } else if (op->handler == uct_rc_ep_flush_op_completion_handler) { + ucs_mpool_put(op); + } else { + desc = ucs_derived_of(op, uct_rc_iface_send_desc_t); + ucs_mpool_put(desc); + } +} + void uct_rc_txqp_purge_outstanding(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, ucs_status_t status, uint16_t sn, int warn) { uct_rc_iface_send_op_t *op; - uct_rc_iface_send_desc_t *desc; ucs_queue_for_each_extract(op, &txqp->outstanding, queue, UCS_CIRCULAR_COMPARE16(op->sn, <=, sn)) { - if (op->handler != (uct_rc_send_handler_t)ucs_mpool_put) { - /* Allow clean flush cancel op from destroy flow */ - if (warn && (op->handler != uct_rc_ep_flush_op_completion_handler)) { - ucs_warn("destroying txqp %p with uncompleted operation %p handler %s", - txqp, op, ucs_debug_get_symbol_name(op->handler)); - } - - if (op->user_comp != NULL) { - /* This must be uct_rc_ep_get_bcopy_handler, - * uct_rc_ep_get_bcopy_handler_no_completion, - * uct_rc_ep_get_zcopy_completion_handler, - * uct_rc_ep_flush_op_completion_handler or - * one of the atomic handlers, - * so invoke user completion */ - uct_invoke_completion(op->user_comp, status); - } - - /* Need to release rdma_read resources taken by get operations */ - if ((op->handler == uct_rc_ep_get_bcopy_handler) || - (op->handler == uct_rc_ep_get_bcopy_handler_no_completion)) { - uct_rc_op_release_get_bcopy(op); - uct_rc_iface_update_reads(iface); - } else if (op->handler == uct_rc_ep_get_zcopy_completion_handler) { - uct_rc_op_release_get_zcopy(op); - uct_rc_iface_update_reads(iface); - } - } - op->flags &= ~(UCT_RC_IFACE_SEND_OP_FLAG_INUSE | - UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY); - if ((op->handler == uct_rc_ep_send_op_completion_handler) || - (op->handler == uct_rc_ep_get_zcopy_completion_handler)) { - uct_rc_iface_put_send_op(op); - } else if (op->handler == uct_rc_ep_flush_op_completion_handler) { - ucs_mpool_put(op); - } else { - desc = ucs_derived_of(op, uct_rc_iface_send_desc_t); - ucs_mpool_put(desc); - } + uct_rc_txqp_purge_outstanding_op(iface, op, status, warn); } } diff --git a/src/uct/ib/rc/base/rc_ep.h b/src/uct/ib/rc/base/rc_ep.h index fa3f031c820..fe52ffd8899 100644 --- a/src/uct/ib/rc/base/rc_ep.h +++ b/src/uct/ib/rc/base/rc_ep.h @@ -42,38 +42,38 @@ enum { /* Keepalive Request scheduled: indicates that keepalive request * is scheduled in pending queue and no more keepalive actions * are needed */ - UCT_RC_EP_FLAG_KEEPALIVE_PENDING = UCS_BIT(0), + UCT_RC_EP_FLAG_KEEPALIVE_PENDING = UCS_BIT(0), /* EP is connected to peer */ - UCT_RC_EP_FLAG_CONNECTED = UCS_BIT(1), + UCT_RC_EP_FLAG_CONNECTED = UCS_BIT(1), /* Flush cancel was executed on EP */ - UCT_RC_EP_FLAG_FLUSH_CANCEL = UCS_BIT(2), + UCT_RC_EP_FLAG_FLUSH_CANCEL = UCS_BIT(2), /* Error handler already called or flush(CANCEL) disabled it */ - UCT_RC_EP_FLAG_NO_ERR_HANDLER = UCS_BIT(3), + UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED = UCS_BIT(3), /* Soft Credit Request: indicates that peer needs to piggy-back credits * grant to counter AM (if any). Can be bundled with * UCT_RC_EP_FLAG_FC_GRANT */ - UCT_RC_EP_FLAG_FC_SOFT_REQ = UCS_BIT(UCT_AM_ID_BITS), + UCT_RC_EP_FLAG_FC_SOFT_REQ = UCS_BIT(UCT_AM_ID_BITS), /* Hard Credit Request: indicates that wnd is close to be exhausted. * The peer must send separate AM with credit grant as soon as it * receives AM with this bit set. Can be bundled with * UCT_RC_EP_FLAG_FC_GRANT */ - UCT_RC_EP_FLAG_FC_HARD_REQ = UCS_BIT((UCT_AM_ID_BITS) + 1), + UCT_RC_EP_FLAG_FC_HARD_REQ = UCS_BIT((UCT_AM_ID_BITS) + 1), /* Credit Grant: ep should update its FC wnd as soon as it receives AM with * this bit set. Can be bundled with either soft or hard request bits */ - UCT_RC_EP_FLAG_FC_GRANT = UCS_BIT((UCT_AM_ID_BITS) + 2), + UCT_RC_EP_FLAG_FC_GRANT = UCS_BIT((UCT_AM_ID_BITS) + 2), /* Special FC AM with Credit Grant: Just an empty message indicating * credit grant. Can't be bundled with any other FC flag (as it consumes * all 3 FC bits). */ - UCT_RC_EP_FC_PURE_GRANT = (UCT_RC_EP_FLAG_FC_HARD_REQ | - UCT_RC_EP_FLAG_FC_SOFT_REQ | - UCT_RC_EP_FLAG_FC_GRANT) + UCT_RC_EP_FC_PURE_GRANT = (UCT_RC_EP_FLAG_FC_HARD_REQ | + UCT_RC_EP_FLAG_FC_SOFT_REQ | + UCT_RC_EP_FLAG_FC_GRANT) }; /* @@ -277,6 +277,10 @@ void uct_rc_fc_cleanup(uct_rc_fc_t *fc); ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self); +void uct_rc_txqp_purge_outstanding_op(uct_rc_iface_t *iface, + uct_rc_iface_send_op_t *op, + ucs_status_t status, int warn); + void uct_rc_txqp_purge_outstanding(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, ucs_status_t status, uint16_t sn, int warn); diff --git a/src/uct/ib/rc/base/rc_iface.h b/src/uct/ib/rc/base/rc_iface.h index adac6711648..843562093fc 100644 --- a/src/uct/ib/rc/base/rc_iface.h +++ b/src/uct/ib/rc/base/rc_iface.h @@ -98,19 +98,20 @@ enum { /* flags for uct_rc_iface_send_op_t */ enum { + UCT_RC_IFACE_SEND_OP_FLAG_FC_GRANT = UCS_BIT(11), #ifdef NVALGRIND - UCT_RC_IFACE_SEND_OP_FLAG_IOV = 0, + UCT_RC_IFACE_SEND_OP_FLAG_IOV = 0, #else - UCT_RC_IFACE_SEND_OP_FLAG_IOV = UCS_BIT(12), /* save iovec to make mem defined */ + UCT_RC_IFACE_SEND_OP_FLAG_IOV = UCS_BIT(12), /* save iovec to make mem defined */ #endif #if UCS_ENABLE_ASSERT - UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY = UCS_BIT(13), /* zcopy */ - UCT_RC_IFACE_SEND_OP_FLAG_IFACE = UCS_BIT(14), /* belongs to iface ops buffer */ - UCT_RC_IFACE_SEND_OP_FLAG_INUSE = UCS_BIT(15) /* queued on a txqp */ + UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY = UCS_BIT(13), /* zcopy */ + UCT_RC_IFACE_SEND_OP_FLAG_IFACE = UCS_BIT(14), /* belongs to iface ops buffer */ + UCT_RC_IFACE_SEND_OP_FLAG_INUSE = UCS_BIT(15) /* queued on a txqp */ #else - UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY = 0, - UCT_RC_IFACE_SEND_OP_FLAG_IFACE = 0, - UCT_RC_IFACE_SEND_OP_FLAG_INUSE = 0 + UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY = 0, + UCT_RC_IFACE_SEND_OP_FLAG_IFACE = 0, + UCT_RC_IFACE_SEND_OP_FLAG_INUSE = 0 #endif }; diff --git a/src/uct/ib/rc/verbs/rc_verbs_iface.c b/src/uct/ib/rc/verbs/rc_verbs_iface.c index 7b155956f89..d1ddf9f9f32 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_iface.c +++ b/src/uct/ib/rc/verbs/rc_verbs_iface.c @@ -78,12 +78,12 @@ static void uct_rc_verbs_handle_failure(uct_ib_iface_t *ib_iface, void *arg, ep->txcnt.ci + count, 0); uct_rc_verbs_update_tx_res(iface, ep, count); - if (ep->super.flags & (UCT_RC_EP_FLAG_NO_ERR_HANDLER | + if (ep->super.flags & (UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED | UCT_RC_EP_FLAG_FLUSH_CANCEL)) { return; } - ep->super.flags |= UCT_RC_EP_FLAG_NO_ERR_HANDLER; + ep->super.flags |= UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED; status = uct_iface_handle_ep_err(&iface->super.super.super, &ep->super.super.super, ep_status); diff --git a/test/gtest/uct/test_flush.cc b/test/gtest/uct/test_flush.cc index b91733d7a43..1a5dc4162e0 100644 --- a/test/gtest/uct/test_flush.cc +++ b/test/gtest/uct/test_flush.cc @@ -552,7 +552,8 @@ UCT_INSTANTIATE_TEST_CASE(uct_flush_test) class uct_cancel_test : public uct_test { public: - static const size_t BUF_SIZE = 8 * 1024; + static const size_t BUF_SIZE = 8 * 1024; + static const size_t BUF_SIZE_DC = 1 * 1024; class peer { public: @@ -569,6 +570,8 @@ class uct_cancel_test : public uct_test { } void connect() { + m_e->destroy_eps(); + m_peer->m_e->destroy_eps(); m_e->connect(0, *m_peer->m_e, 0); m_peer->m_e->connect(0, *m_e, 0); } @@ -607,6 +610,10 @@ class uct_cancel_test : public uct_test { size_t header_length = 0; uct_iov_t iov; + if (has_transport("dc_mlx5")) { + size = ucs_min(BUF_SIZE_DC, size); + } + iov.buffer = (char*)sendbuf.ptr() + header_length; iov.count = 1; iov.length = size - header_length; @@ -620,6 +627,10 @@ class uct_cancel_test : public uct_test { mapped_buffer &sendbuf = *s->m_buf; mapped_buffer &recvbuf = *s->m_peer->m_buf; + if (has_transport("dc_mlx5")) { + size = ucs_min(BUF_SIZE_DC, size); + } + UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), size, sendbuf.memh(), s->m_e->iface_attr().cap.get.max_iov); @@ -647,7 +658,7 @@ class uct_cancel_test : public uct_test { done.count = flushing.size() + 1; done.status = UCS_OK; done.func = NULL; - ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(50.0); + ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(200.0); while (!flushing.empty() && (ucs_get_time() < loop_end_limit)) { std::list::iterator iter = flushing.begin(); while (iter != flushing.end()) { @@ -666,14 +677,17 @@ class uct_cancel_test : public uct_test { short_progress_loop(); } ASSERT_UCS_OK_OR_INPROGRESS(status); + double holdup = 200.0 - ucs_time_to_sec(loop_end_limit - ucs_get_time()); + if (holdup > 10.0) { + UCS_TEST_MESSAGE << "flush took " << holdup << " sec"; + } /* coverity[loop_condition] */ while (done.count != 1) { progress(); } - m_s1->m_e->destroy_eps(); - m_s1->m_e->connect(0, *m_s0->m_e, 0); + m_s0->connect(); /* there is a chance that one side getting disconect error before * calling flush(CANCEL) */ @@ -707,7 +721,8 @@ class uct_cancel_test : public uct_test { } void do_test(send_func_t send) { - for (int i = 0; i < count(); ++i) { + ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(300.0); + for (int i = 0; (i < count()) && (ucs_get_time() < loop_end_limit); ++i) { fill(send); flush_and_reconnect(); } @@ -764,9 +779,9 @@ class uct_cancel_test : public uct_test { } void check_skip_test_tl() { - const resource *r = dynamic_cast(GetParam()); - - if ((r->tl_name != "rc_mlx5") && (r->tl_name != "rc_verbs")) { + if ((GetParam()->tl_name != "dc_mlx5") && + (GetParam()->tl_name != "rc_verbs") && + (GetParam()->tl_name != "rc_mlx5")) { UCS_TEST_SKIP_R("not supported yet"); }