Skip to content

Commit

Permalink
UCT/DC: Flush TX
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemy-Mellanox committed Nov 25, 2020
1 parent 0f427f4 commit dbcc5f2
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 54 deletions.
1 change: 1 addition & 0 deletions src/uct/ib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ endif # HAVE_TL_RC
if HAVE_TL_DC
noinst_HEADERS += \
dc/dc_mlx5_ep.h \
dc/dc_mlx5.inl \
dc/dc_mlx5.h

libuct_ib_la_SOURCES += \
Expand Down
43 changes: 26 additions & 17 deletions src/uct/ib/dc/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
# include "config.h"
#endif

#include "dc_mlx5.inl"
#include "dc_mlx5.h"
#include "dc_mlx5_ep.h"

#include <uct/api/uct.h>
#include <uct/ib/rc/accel/rc_mlx5.inl>
#include <uct/ib/base/ib_device.h>
#include <uct/ib/base/ib_log.h>
#include <uct/ib/mlx5/ib_mlx5_log.h>
Expand Down Expand Up @@ -226,11 +226,7 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface)
iface, dci, qp_num, txqp, hw_ci);

uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci);

uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci));
ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max);

uct_rc_iface_update_reads(&iface->super.super);
uct_dc_mlx5_update_tx_res(iface, txwq, txqp, hw_ci);

/**
* Note: DCI is released after handling completion callbacks,
Expand Down Expand Up @@ -1327,7 +1323,10 @@ uct_dc_mlx5_dci_keepalive_handle_failure(uct_dc_mlx5_iface_t *iface,
ucs_mpool_put(op);

reset_dci:
uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status);
uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len);
uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status,
txwq->sw_pi, 0);
uct_dc_mlx5_iface_reset_dci(iface, dci);
}

ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface)
Expand All @@ -1349,21 +1348,15 @@ ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface)
return UCS_OK;
}

void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci,
ucs_status_t ep_status)
void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md,
uct_ib_mlx5_md_t);
uct_ib_mlx5_txwq_t *txwq;
uct_rc_txqp_t *txqp;
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md,
uct_ib_mlx5_md_t);
uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq;
ucs_status_t status;

ucs_debug("iface %p reset dci[%d]", iface, dci);

UCT_DC_MLX5_IFACE_TXQP_DCI_GET(iface, dci, txqp, txwq);
uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len);
uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status,
txwq->sw_pi, 0);
ucs_assert(txwq->super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS);

/* Synchronize CQ index with the driver, since it would remove pending
Expand Down Expand Up @@ -1402,14 +1395,30 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface,
ucs_status_t status;
ucs_log_level_t log_lvl;

if (ep->flags & (UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER |
UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL)) {
return;
}

if (ep_status == UCS_ERR_CANCELED) {
return;
}

if (ep == iface->tx.fc_ep) {
/* Cannot handle errors on flow-control endpoint.
* Or shall we ignore them?
*/
ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface,
ucs_status_string(ep_status));
return;
}

status = uct_iface_handle_ep_err(&ib_iface->super.super,
&ep->super.super, ep_status);
log_lvl = uct_ib_iface_failure_log_level(ib_iface, status, ep_status);
uct_ib_mlx5_completion_with_err(ib_iface, (uct_ib_mlx5_err_cqe_t*)cqe,
txwq, log_lvl);

ep->flags |= UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER;
}

4 changes: 1 addition & 3 deletions src/uct/ib/dc/dc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,7 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface,
uct_ib_mlx5_txwq_t *txwq,
ucs_status_t ep_status);

void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface,
uint8_t dci,
ucs_status_t ep_status);
void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci);

#if HAVE_DEVX

Expand Down
22 changes: 22 additions & 0 deletions src/uct/ib/dc/dc_mlx5.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "dc_mlx5.h"

#include <uct/ib/rc/accel/rc_mlx5.inl>
#include "uct/ib/rc/base/rc_iface.h"
#include "uct/ib/rc/base/rc_ep.h"


static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_update_tx_res(uct_dc_mlx5_iface_t *iface, uct_ib_mlx5_txwq_t *txwq,
uct_rc_txqp_t *txqp, uint16_t hw_ci)
{
uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci));
ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max);

uct_rc_iface_update_reads(&iface->super.super);
}
46 changes: 22 additions & 24 deletions src/uct/ib/dc/dc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
# include "config.h"
#endif

#include "dc_mlx5.inl"
#include "dc_mlx5_ep.h"
#include "dc_mlx5.h"

#include <uct/ib/rc/accel/rc_mlx5.inl>
#include <uct/ib/mlx5/ib_mlx5_log.h>

#define UCT_DC_MLX5_IFACE_TXQP_GET(_iface, _ep, _txqp, _txwq) \
Expand Down Expand Up @@ -554,6 +554,8 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
{
uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md,
uct_ib_mlx5_md_t);
ucs_status_t status;
UCT_DC_MLX5_TXQP_DECL(txqp, txwq);

Expand All @@ -562,14 +564,10 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
return UCS_ERR_UNSUPPORTED;
}

uct_ep_pending_purge(tl_ep, NULL, 0);
if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
/* No dci -> no WQEs -> HW is clean, nothing to cancel */
return UCS_OK;
}

uct_dc_mlx5_ep_handle_failure(ep, NULL, UCS_ERR_CANCELED);
return UCS_OK;
}

if (!uct_dc_mlx5_iface_has_tx_resources(iface)) {
Expand Down Expand Up @@ -600,6 +598,16 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,

UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);

if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
ucs_assert(!(ep->flags & UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL));
status = uct_ib_mlx5_modify_qp_state(md, &txwq->super, IBV_QPS_ERR);
if (status != UCS_OK) {
return status;
}

ep->flags |= UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL;
}

return uct_rc_txqp_add_flush_comp(&iface->super.super, &ep->super, txqp,
comp, txwq->sig_pi);
}
Expand Down Expand Up @@ -975,7 +983,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)
"iface (%p) ep (%p) dci leak detected: dci=%d", iface,
self, self->dci);

/* TODO should be removed by flush */
uct_rc_txqp_purge_outstanding(&iface->super.super,
&iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED,
iface->tx.dcis[self->dci].txwq.sw_pi, 1);
Expand Down Expand Up @@ -1310,34 +1317,25 @@ ucs_status_t uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_
void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg,
ucs_status_t ep_status)
{
struct mlx5_cqe64 *cqe = arg;
uct_iface_h tl_iface = ep->super.super.iface;
uint8_t dci = ep->dci;
uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
uct_rc_txqp_t *txqp = &iface->tx.dcis[dci].txqp;
uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq;
uint16_t pi = ntohs(cqe->wqe_counter);

ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status);

/* since we removed all outstanding ops on the dci, it should be released */
uct_dc_mlx5_update_tx_res(iface, txwq, txqp, pi);
uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, pi, 0);

ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
uct_dc_mlx5_iface_dci_put(iface, dci);
ucs_assert_always(ep->dci == UCT_DC_MLX5_EP_NO_DCI);
uct_dc_mlx5_iface_set_ep_failed(iface, ep, cqe, txwq, ep_status);

if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
/* No need to wait for grant on this ep anymore */
uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
}

if (ep == iface->tx.fc_ep) {
ucs_assert(ep_status != UCS_ERR_CANCELED);
/* Cannot handle errors on flow-control endpoint.
* Or shall we ignore them?
*/
ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface,
ucs_status_string(ep_status));
} else {
uct_dc_mlx5_iface_set_ep_failed(iface, ep, (struct mlx5_cqe64*)arg,
txwq, ep_status);
if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
uct_dc_mlx5_iface_reset_dci(iface, dci);
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/uct/ib/dc/dc_mlx5_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,17 @@ enum uct_dc_mlx5_ep_flags {
* 1) Grant arrives for the recently deleted ep.
* 2) QP resources are available, but there are some pending requests. */
UCT_DC_MLX5_EP_FLAG_FC_WAIT_FOR_GRANT = UCS_BIT(3),

/* Keepalive Request scheduled: indicates that keepalive request
* is scheduled in outstanding queue and no more keepalive actions
* are needed */
UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4)
UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4),

/* Flush cancel was executed on EP */
UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL = UCS_BIT(5),

/* Error handler already called or flush(CANCEL) disabled it */
UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER = UCS_BIT(6),
};


Expand Down Expand Up @@ -391,6 +398,11 @@ uct_dc_mlx5_iface_dci_put(uct_dc_mlx5_iface_t *iface, uint8_t dci)
return;
}

if ((ep->flags & UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL) &&
!(ep->flags & UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER)) {
uct_dc_mlx5_iface_reset_dci(iface, dci);
}

uct_dc_mlx5_iface_dci_release(iface, dci);

ucs_assert(uct_dc_mlx5_ep_from_dci(iface, dci)->dci != UCT_DC_MLX5_EP_NO_DCI);
Expand All @@ -412,7 +424,9 @@ static inline void uct_dc_mlx5_iface_dci_alloc(uct_dc_mlx5_iface_t *iface, uct_d
* dci must have resources to transmit.
*/
ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
ep->dci = iface->tx.dcis_stack[iface->tx.stack_top];
ep->dci = iface->tx.dcis_stack[iface->tx.stack_top];
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL;
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_NO_ERR_HANDLER;
ucs_assert(ep->dci < iface->tx.ndci);
ucs_assert(uct_dc_mlx5_ep_from_dci(iface, ep->dci) == NULL);
ucs_assert(iface->tx.dcis[ep->dci].flags == 0);
Expand Down
22 changes: 14 additions & 8 deletions test/gtest/uct/test_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ class uct_cancel_test : public uct_test {
}

void connect() {
m_e->destroy_eps();
m_peer->m_e->destroy_eps();
m_e->connect(0, *m_peer->m_e, 0);
m_peer->m_e->connect(0, *m_e, 0);
}
Expand Down Expand Up @@ -636,7 +638,7 @@ class uct_cancel_test : public uct_test {
recvbuf.addr(), recvbuf.rkey(), NULL);
}

void flush_and_reconnect() {
int flush_and_reconnect() {
std::list<entity *> flushing;
ucs_status_t status = UCS_OK;
uct_completion_t done;
Expand All @@ -647,7 +649,7 @@ class uct_cancel_test : public uct_test {
done.count = flushing.size() + 1;
done.status = UCS_OK;
done.func = NULL;
ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(50.0);
ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(200.0);
while (!flushing.empty() && (ucs_get_time() < loop_end_limit)) {
std::list<entity *>::iterator iter = flushing.begin();
while (iter != flushing.end()) {
Expand All @@ -666,16 +668,20 @@ class uct_cancel_test : public uct_test {
short_progress_loop();
}
ASSERT_UCS_OK_OR_INPROGRESS(status);
double holdup = 200.0 - ucs_time_to_sec(loop_end_limit - ucs_get_time());
if (holdup > 10.0) {
UCS_TEST_MESSAGE << "flush took " << holdup << " sec";
}

/* coverity[loop_condition] */
while (done.count != 1) {
progress();
}

m_s1->m_e->destroy_eps();
m_s1->m_e->connect(0, *m_s0->m_e, 0);
m_s0->connect();

EXPECT_EQ(0, m_err_count);
return holdup / 10.0;
}

typedef ucs_status_t (uct_cancel_test::* send_func_t)(peer *s);
Expand Down Expand Up @@ -707,7 +713,7 @@ class uct_cancel_test : public uct_test {
void do_test(send_func_t send) {
for (int i = 0; i < count(); ++i) {
fill(send);
flush_and_reconnect();
i += flush_and_reconnect();
}
}

Expand Down Expand Up @@ -763,9 +769,9 @@ class uct_cancel_test : public uct_test {
}

void check_skip_test_tl() {
const resource *r = dynamic_cast<const resource*>(GetParam());

if ((r->tl_name != "rc_mlx5") && (r->tl_name != "rc_verbs")) {
if ((GetParam()->tl_name != "dc_mlx5") &&
(GetParam()->tl_name != "rc_verbs") &&
(GetParam()->tl_name != "rc_mlx5")) {
UCS_TEST_SKIP_R("not supported yet");
}

Expand Down

0 comments on commit dbcc5f2

Please sign in to comment.