From dbcc5f22abd117d914e30b375e029b52fdc6299c Mon Sep 17 00:00:00 2001 From: Artemy Kovalyov Date: Sun, 15 Nov 2020 17:09:24 +0200 Subject: [PATCH] UCT/DC: Flush TX --- src/uct/ib/Makefile.am | 1 + src/uct/ib/dc/dc_mlx5.c | 43 ++++++++++++++++++++------------- src/uct/ib/dc/dc_mlx5.h | 4 +--- src/uct/ib/dc/dc_mlx5.inl | 22 +++++++++++++++++ src/uct/ib/dc/dc_mlx5_ep.c | 46 +++++++++++++++++------------------- src/uct/ib/dc/dc_mlx5_ep.h | 18 ++++++++++++-- test/gtest/uct/test_flush.cc | 22 ++++++++++------- 7 files changed, 102 insertions(+), 54 deletions(-) create mode 100644 src/uct/ib/dc/dc_mlx5.inl diff --git a/src/uct/ib/Makefile.am b/src/uct/ib/Makefile.am index 61f2cdd08e69..c7a93fdef195 100644 --- a/src/uct/ib/Makefile.am +++ b/src/uct/ib/Makefile.am @@ -101,6 +101,7 @@ endif # HAVE_TL_RC if HAVE_TL_DC noinst_HEADERS += \ dc/dc_mlx5_ep.h \ + dc/dc_mlx5.inl \ dc/dc_mlx5.h libuct_ib_la_SOURCES += \ diff --git a/src/uct/ib/dc/dc_mlx5.c b/src/uct/ib/dc/dc_mlx5.c index f64cd697535d..4963e5f59693 100644 --- a/src/uct/ib/dc/dc_mlx5.c +++ b/src/uct/ib/dc/dc_mlx5.c @@ -8,11 +8,11 @@ # include "config.h" #endif +#include "dc_mlx5.inl" #include "dc_mlx5.h" #include "dc_mlx5_ep.h" #include -#include #include #include #include @@ -226,11 +226,7 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface) iface, dci, qp_num, txqp, hw_ci); uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci); - - uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci)); - ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max); - - uct_rc_iface_update_reads(&iface->super.super); + uct_dc_mlx5_update_tx_res(iface, txwq, txqp, hw_ci); /** * Note: DCI is released after handling completion callbacks, @@ -1327,7 +1323,10 @@ uct_dc_mlx5_dci_keepalive_handle_failure(uct_dc_mlx5_iface_t *iface, ucs_mpool_put(op); reset_dci: - uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status); + uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len); + uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, + txwq->sw_pi, 0); + uct_dc_mlx5_iface_reset_dci(iface, dci); } ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface) @@ -1349,21 +1348,15 @@ ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface) return UCS_OK; } -void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci, - ucs_status_t ep_status) +void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci) { - uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md, - uct_ib_mlx5_md_t); - uct_ib_mlx5_txwq_t *txwq; - uct_rc_txqp_t *txqp; + uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md, + uct_ib_mlx5_md_t); + uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq; ucs_status_t status; ucs_debug("iface %p reset dci[%d]", iface, dci); - UCT_DC_MLX5_IFACE_TXQP_DCI_GET(iface, dci, txqp, txwq); - uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len); - uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, - txwq->sw_pi, 0); ucs_assert(txwq->super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS); /* Synchronize CQ index with the driver, since it would remove pending @@ -1402,14 +1395,30 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface, ucs_status_t status; ucs_log_level_t log_lvl; + if (ep->flags & (UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER | + UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL)) { + return; + } + if (ep_status == UCS_ERR_CANCELED) { return; } + if (ep == iface->tx.fc_ep) { + /* Cannot handle errors on flow-control endpoint. + * Or shall we ignore them? + */ + ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface, + ucs_status_string(ep_status)); + return; + } + status = uct_iface_handle_ep_err(&ib_iface->super.super, &ep->super.super, ep_status); log_lvl = uct_ib_iface_failure_log_level(ib_iface, status, ep_status); uct_ib_mlx5_completion_with_err(ib_iface, (uct_ib_mlx5_err_cqe_t*)cqe, txwq, log_lvl); + + ep->flags |= UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER; } diff --git a/src/uct/ib/dc/dc_mlx5.h b/src/uct/ib/dc/dc_mlx5.h index a0d1d23793b2..8ba435a038ea 100644 --- a/src/uct/ib/dc/dc_mlx5.h +++ b/src/uct/ib/dc/dc_mlx5.h @@ -257,9 +257,7 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface, uct_ib_mlx5_txwq_t *txwq, ucs_status_t ep_status); -void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, - uint8_t dci, - ucs_status_t ep_status); +void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci); #if HAVE_DEVX diff --git a/src/uct/ib/dc/dc_mlx5.inl b/src/uct/ib/dc/dc_mlx5.inl new file mode 100644 index 000000000000..3d670e9f8c6f --- /dev/null +++ b/src/uct/ib/dc/dc_mlx5.inl @@ -0,0 +1,22 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED. +* +* See file LICENSE for terms. +*/ + +#include "dc_mlx5.h" + +#include +#include "uct/ib/rc/base/rc_iface.h" +#include "uct/ib/rc/base/rc_ep.h" + + +static UCS_F_ALWAYS_INLINE void +uct_dc_mlx5_update_tx_res(uct_dc_mlx5_iface_t *iface, uct_ib_mlx5_txwq_t *txwq, + uct_rc_txqp_t *txqp, uint16_t hw_ci) +{ + uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci)); + ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max); + + uct_rc_iface_update_reads(&iface->super.super); +} diff --git a/src/uct/ib/dc/dc_mlx5_ep.c b/src/uct/ib/dc/dc_mlx5_ep.c index 622aa65e9486..93f0764ef9d0 100644 --- a/src/uct/ib/dc/dc_mlx5_ep.c +++ b/src/uct/ib/dc/dc_mlx5_ep.c @@ -8,10 +8,10 @@ # include "config.h" #endif +#include "dc_mlx5.inl" #include "dc_mlx5_ep.h" #include "dc_mlx5.h" -#include #include #define UCT_DC_MLX5_IFACE_TXQP_GET(_iface, _ep, _txqp, _txwq) \ @@ -554,6 +554,8 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); + uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md, + uct_ib_mlx5_md_t); ucs_status_t status; UCT_DC_MLX5_TXQP_DECL(txqp, txwq); @@ -562,14 +564,10 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, return UCS_ERR_UNSUPPORTED; } - uct_ep_pending_purge(tl_ep, NULL, 0); if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) { /* No dci -> no WQEs -> HW is clean, nothing to cancel */ return UCS_OK; } - - uct_dc_mlx5_ep_handle_failure(ep, NULL, UCS_ERR_CANCELED); - return UCS_OK; } if (!uct_dc_mlx5_iface_has_tx_resources(iface)) { @@ -600,6 +598,16 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq); + if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) { + ucs_assert(!(ep->flags & UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL)); + status = uct_ib_mlx5_modify_qp_state(md, &txwq->super, IBV_QPS_ERR); + if (status != UCS_OK) { + return status; + } + + ep->flags |= UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL; + } + return uct_rc_txqp_add_flush_comp(&iface->super.super, &ep->super, txqp, comp, txwq->sig_pi); } @@ -975,7 +983,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t) "iface (%p) ep (%p) dci leak detected: dci=%d", iface, self, self->dci); - /* TODO should be removed by flush */ uct_rc_txqp_purge_outstanding(&iface->super.super, &iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED, iface->tx.dcis[self->dci].txwq.sw_pi, 1); @@ -1310,34 +1317,25 @@ ucs_status_t uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_ void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg, ucs_status_t ep_status) { + struct mlx5_cqe64 *cqe = arg; uct_iface_h tl_iface = ep->super.super.iface; uint8_t dci = ep->dci; uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t); + uct_rc_txqp_t *txqp = &iface->tx.dcis[dci].txqp; uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq; + uint16_t pi = ntohs(cqe->wqe_counter); ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface)); - uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status); - /* since we removed all outstanding ops on the dci, it should be released */ + uct_dc_mlx5_update_tx_res(iface, txwq, txqp, pi); + uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, pi, 0); + ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI); uct_dc_mlx5_iface_dci_put(iface, dci); - ucs_assert_always(ep->dci == UCT_DC_MLX5_EP_NO_DCI); + uct_dc_mlx5_iface_set_ep_failed(iface, ep, cqe, txwq, ep_status); - if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) { - /* No need to wait for grant on this ep anymore */ - uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep); - } - - if (ep == iface->tx.fc_ep) { - ucs_assert(ep_status != UCS_ERR_CANCELED); - /* Cannot handle errors on flow-control endpoint. - * Or shall we ignore them? - */ - ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface, - ucs_status_string(ep_status)); - } else { - uct_dc_mlx5_iface_set_ep_failed(iface, ep, (struct mlx5_cqe64*)arg, - txwq, ep_status); + if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) { + uct_dc_mlx5_iface_reset_dci(iface, dci); } } diff --git a/src/uct/ib/dc/dc_mlx5_ep.h b/src/uct/ib/dc/dc_mlx5_ep.h index 4650df9df7e5..cc4fe6a73920 100644 --- a/src/uct/ib/dc/dc_mlx5_ep.h +++ b/src/uct/ib/dc/dc_mlx5_ep.h @@ -29,10 +29,17 @@ enum uct_dc_mlx5_ep_flags { * 1) Grant arrives for the recently deleted ep. * 2) QP resources are available, but there are some pending requests. */ UCT_DC_MLX5_EP_FLAG_FC_WAIT_FOR_GRANT = UCS_BIT(3), + /* Keepalive Request scheduled: indicates that keepalive request * is scheduled in outstanding queue and no more keepalive actions * are needed */ - UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4) + UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4), + + /* Flush cancel was executed on EP */ + UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL = UCS_BIT(5), + + /* Error handler already called or flush(CANCEL) disabled it */ + UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER = UCS_BIT(6), }; @@ -391,6 +398,11 @@ uct_dc_mlx5_iface_dci_put(uct_dc_mlx5_iface_t *iface, uint8_t dci) return; } + if ((ep->flags & UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL) && + !(ep->flags & UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER)) { + uct_dc_mlx5_iface_reset_dci(iface, dci); + } + uct_dc_mlx5_iface_dci_release(iface, dci); ucs_assert(uct_dc_mlx5_ep_from_dci(iface, dci)->dci != UCT_DC_MLX5_EP_NO_DCI); @@ -412,7 +424,9 @@ static inline void uct_dc_mlx5_iface_dci_alloc(uct_dc_mlx5_iface_t *iface, uct_d * dci must have resources to transmit. */ ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface)); - ep->dci = iface->tx.dcis_stack[iface->tx.stack_top]; + ep->dci = iface->tx.dcis_stack[iface->tx.stack_top]; + ep->flags &= ~UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL; + ep->flags &= ~UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER; ucs_assert(ep->dci < iface->tx.ndci); ucs_assert(uct_dc_mlx5_ep_from_dci(iface, ep->dci) == NULL); ucs_assert(iface->tx.dcis[ep->dci].flags == 0); diff --git a/test/gtest/uct/test_flush.cc b/test/gtest/uct/test_flush.cc index 58c6dda79d5c..91a3d5d5bc81 100644 --- a/test/gtest/uct/test_flush.cc +++ b/test/gtest/uct/test_flush.cc @@ -569,6 +569,8 @@ class uct_cancel_test : public uct_test { } void connect() { + m_e->destroy_eps(); + m_peer->m_e->destroy_eps(); m_e->connect(0, *m_peer->m_e, 0); m_peer->m_e->connect(0, *m_e, 0); } @@ -636,7 +638,7 @@ class uct_cancel_test : public uct_test { recvbuf.addr(), recvbuf.rkey(), NULL); } - void flush_and_reconnect() { + int flush_and_reconnect() { std::list flushing; ucs_status_t status = UCS_OK; uct_completion_t done; @@ -647,7 +649,7 @@ class uct_cancel_test : public uct_test { done.count = flushing.size() + 1; done.status = UCS_OK; done.func = NULL; - ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(50.0); + ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(200.0); while (!flushing.empty() && (ucs_get_time() < loop_end_limit)) { std::list::iterator iter = flushing.begin(); while (iter != flushing.end()) { @@ -666,16 +668,20 @@ class uct_cancel_test : public uct_test { short_progress_loop(); } ASSERT_UCS_OK_OR_INPROGRESS(status); + double holdup = 200.0 - ucs_time_to_sec(loop_end_limit - ucs_get_time()); + if (holdup > 10.0) { + UCS_TEST_MESSAGE << "flush took " << holdup << " sec"; + } /* coverity[loop_condition] */ while (done.count != 1) { progress(); } - m_s1->m_e->destroy_eps(); - m_s1->m_e->connect(0, *m_s0->m_e, 0); + m_s0->connect(); EXPECT_EQ(0, m_err_count); + return holdup / 10.0; } typedef ucs_status_t (uct_cancel_test::* send_func_t)(peer *s); @@ -707,7 +713,7 @@ class uct_cancel_test : public uct_test { void do_test(send_func_t send) { for (int i = 0; i < count(); ++i) { fill(send); - flush_and_reconnect(); + i += flush_and_reconnect(); } } @@ -763,9 +769,9 @@ class uct_cancel_test : public uct_test { } void check_skip_test_tl() { - const resource *r = dynamic_cast(GetParam()); - - if ((r->tl_name != "rc_mlx5") && (r->tl_name != "rc_verbs")) { + if ((GetParam()->tl_name != "dc_mlx5") && + (GetParam()->tl_name != "rc_verbs") && + (GetParam()->tl_name != "rc_mlx5")) { UCS_TEST_SKIP_R("not supported yet"); }