Skip to content

Commit

Permalink
Merge pull request #5872 from Artemy-Mellanox/topic/flush_cancel_nb-2
Browse files Browse the repository at this point in the history
UCT/DC: Flush TX
  • Loading branch information
yosefe authored Dec 17, 2020
2 parents 3386ed7 + b609ad1 commit a58c494
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 135 deletions.
1 change: 1 addition & 0 deletions src/uct/ib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ endif # HAVE_TL_RC
if HAVE_TL_DC
noinst_HEADERS += \
dc/dc_mlx5_ep.h \
dc/dc_mlx5.inl \
dc/dc_mlx5.h

libuct_ib_la_SOURCES += \
Expand Down
41 changes: 24 additions & 17 deletions src/uct/ib/dc/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
# include "config.h"
#endif

#include "dc_mlx5.inl"
#include "dc_mlx5.h"
#include "dc_mlx5_ep.h"

#include <uct/api/uct.h>
#include <uct/ib/rc/accel/rc_mlx5.inl>
#include <uct/ib/base/ib_device.h>
#include <uct/ib/base/ib_log.h>
#include <uct/ib/mlx5/ib_mlx5_log.h>
Expand Down Expand Up @@ -226,11 +226,7 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface)
iface, dci, qp_num, txqp, hw_ci);

uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci);

uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci));
ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max);

uct_rc_iface_update_reads(&iface->super.super);
uct_dc_mlx5_update_tx_res(iface, txwq, txqp, hw_ci);

/**
* Note: DCI is released after handling completion callbacks,
Expand Down Expand Up @@ -1327,7 +1323,10 @@ uct_dc_mlx5_dci_keepalive_handle_failure(uct_dc_mlx5_iface_t *iface,
ucs_mpool_put(op);

reset_dci:
uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status);
uct_rc_txqp_available_set(txqp, iface->super.super.config.tx_qp_len);
uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status,
txwq->sw_pi, 0);
uct_dc_mlx5_iface_reset_dci(iface, dci);
}

ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface)
Expand All @@ -1349,21 +1348,15 @@ ucs_status_t uct_dc_mlx5_iface_keepalive_init(uct_dc_mlx5_iface_t *iface)
return UCS_OK;
}

void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci,
ucs_status_t ep_status)
void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md,
uct_ib_mlx5_md_t);
uct_ib_mlx5_txwq_t *txwq;
uct_rc_txqp_t *txqp;
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md,
uct_ib_mlx5_md_t);
uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq;
ucs_status_t status;

ucs_debug("iface %p reset dci[%d]", iface, dci);

UCT_DC_MLX5_IFACE_TXQP_DCI_GET(iface, dci, txqp, txwq);
uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len);
uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status,
txwq->sw_pi, 0);
ucs_assert(txwq->super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS);

/* Synchronize CQ index with the driver, since it would remove pending
Expand Down Expand Up @@ -1402,14 +1395,28 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface,
ucs_status_t status;
ucs_log_level_t log_lvl;

if (ep->flags & (UCT_DC_MLX5_EP_FLAG_ERR_HANDLER_INVOKED |
UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL)) {
return;
}

if (ep_status == UCS_ERR_CANCELED) {
return;
}

if (ep == iface->tx.fc_ep) {
/* Do not report errors on flow control endpoint */
ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface,
ucs_status_string(ep_status));
return;
}

status = uct_iface_handle_ep_err(&ib_iface->super.super,
&ep->super.super, ep_status);
log_lvl = uct_ib_iface_failure_log_level(ib_iface, status, ep_status);
uct_ib_mlx5_completion_with_err(ib_iface, (uct_ib_mlx5_err_cqe_t*)cqe,
txwq, log_lvl);

ep->flags |= UCT_DC_MLX5_EP_FLAG_ERR_HANDLER_INVOKED;
}

4 changes: 1 addition & 3 deletions src/uct/ib/dc/dc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,7 @@ void uct_dc_mlx5_iface_set_ep_failed(uct_dc_mlx5_iface_t *iface,
uct_ib_mlx5_txwq_t *txwq,
ucs_status_t ep_status);

void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface,
uint8_t dci,
ucs_status_t ep_status);
void uct_dc_mlx5_iface_reset_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci);

#if HAVE_DEVX

Expand Down
22 changes: 22 additions & 0 deletions src/uct/ib/dc/dc_mlx5.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "dc_mlx5.h"

#include <uct/ib/rc/accel/rc_mlx5.inl>
#include "uct/ib/rc/base/rc_iface.h"
#include "uct/ib/rc/base/rc_ep.h"


static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_update_tx_res(uct_dc_mlx5_iface_t *iface, uct_ib_mlx5_txwq_t *txwq,
uct_rc_txqp_t *txqp, uint16_t hw_ci)
{
uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci));
ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max);

uct_rc_iface_update_reads(&iface->super.super);
}
101 changes: 63 additions & 38 deletions src/uct/ib/dc/dc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
# include "config.h"
#endif

#include "dc_mlx5.inl"
#include "dc_mlx5_ep.h"
#include "dc_mlx5.h"

#include <uct/ib/rc/accel/rc_mlx5.inl>
#include <uct/ib/mlx5/ib_mlx5_log.h>

#define UCT_DC_MLX5_IFACE_TXQP_GET(_iface, _ep, _txqp, _txwq) \
Expand Down Expand Up @@ -554,24 +554,11 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
{
uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.super.md,
uct_ib_mlx5_md_t);
ucs_status_t status;
UCT_DC_MLX5_TXQP_DECL(txqp, txwq);

if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
return UCS_ERR_UNSUPPORTED;
}

uct_ep_pending_purge(tl_ep, NULL, 0);
if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
/* No dci -> no WQEs -> HW is clean, nothing to cancel */
return UCS_OK;
}

uct_dc_mlx5_ep_handle_failure(ep, NULL, UCS_ERR_CANCELED);
return UCS_OK;
}

if (!uct_dc_mlx5_iface_has_tx_resources(iface)) {
return UCS_ERR_NO_RESOURCE;
}
Expand Down Expand Up @@ -600,6 +587,21 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,

UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);

if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
ucs_assert(ucs_arbiter_group_is_empty(&ep->arb_group));
ucs_assert(!(ep->flags & UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL));
if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
return UCS_ERR_UNSUPPORTED;
}

status = uct_ib_mlx5_modify_qp_state(md, &txwq->super, IBV_QPS_ERR);
if (status != UCS_OK) {
return status;
}

ep->flags |= UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL;
}

return uct_rc_txqp_add_flush_comp(&iface->super.super, &ep->super, txqp,
comp, txwq->sig_pi);
}
Expand Down Expand Up @@ -826,6 +828,7 @@ ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
uct_dc_mlx5_iface_t);
uct_ib_iface_t *ib_iface = &iface->super.super.super;
struct ibv_ah_attr ah_attr = {.is_global = 0};
uct_rc_iface_send_op_t *grant_ep;
uct_dc_fc_sender_data_t sender;
uct_dc_fc_request_t *dc_req;
struct mlx5_wqe_av mlx5_av;
Expand Down Expand Up @@ -892,9 +895,18 @@ ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
sender.global.gid = ib_iface->gid_info.gid;
sender.global.is_global = dc_ep->flags & UCT_DC_MLX5_EP_FLAG_GRH;

grant_ep = ucs_mpool_get(&iface->super.super.tx.send_op_mp);
if (ucs_unlikely(grant_ep == NULL)) {
ucs_error("failed to allocate fc hard req grant op");
return UCS_ERR_NO_MEMORY;
}

UCS_STATS_UPDATE_COUNTER(dc_ep->fc.stats,
UCT_RC_FC_STAT_TX_HARD_REQ, 1);

uct_rc_ep_init_send_op(grant_ep, UCT_RC_IFACE_SEND_OP_FLAG_FC_GRANT,
NULL, (uct_rc_send_handler_t)ucs_mpool_put);
grant_ep->ep = tl_ep;
uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
txqp, txwq, MLX5_OPCODE_SEND_IMM,
&sender.global, sizeof(sender.global), op, sender.ep,
Expand All @@ -904,6 +916,7 @@ ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
uct_dc_mlx5_ep_get_grh(dc_ep),
uct_ib_mlx5_wqe_av_size(&dc_ep->av),
MLX5_WQE_CTRL_SOLICITED, INT_MAX);
uct_rc_txqp_add_send_op_sn(txqp, grant_ep, txwq->prev_sw_pi);
}

return UCS_OK;
Expand Down Expand Up @@ -932,6 +945,28 @@ static void uct_dc_mlx5_ep_keepalive_cleanup(uct_dc_mlx5_ep_t *ep)
}
}

static void uct_dc_mlx5_txqp_purge_outstanding(uct_dc_mlx5_iface_t *iface,
uct_rc_txqp_t *txqp,
ucs_status_t status,
uint16_t sn, int warn)
{
uct_rc_iface_send_op_t *op;
uct_dc_mlx5_ep_t *ep;

ucs_queue_for_each_extract(op, &txqp->outstanding, queue,
UCS_CIRCULAR_COMPARE16(op->sn, <=, sn)) {
if (op->flags & UCT_RC_IFACE_SEND_OP_FLAG_FC_GRANT) {
/* grant operation aborted so need to clear "wait-for-grant" flag */
ep = ucs_derived_of(op->ep, uct_dc_mlx5_ep_t);
uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
ucs_mpool_put(op);
} else {
uct_rc_txqp_purge_outstanding_op(&iface->super.super, op, status,
warn);
}
}
}

UCS_CLASS_INIT_FUNC(uct_dc_mlx5_ep_t, uct_dc_mlx5_iface_t *iface,
const uct_dc_mlx5_iface_addr_t *if_addr,
uct_ib_mlx5_base_av_t *av)
Expand Down Expand Up @@ -975,10 +1010,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)
"iface (%p) ep (%p) dci leak detected: dci=%d", iface,
self, self->dci);

/* TODO should be removed by flush */
uct_rc_txqp_purge_outstanding(&iface->super.super,
&iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED,
iface->tx.dcis[self->dci].txwq.sw_pi, 1);
uct_dc_mlx5_txqp_purge_outstanding(iface, &iface->tx.dcis[self->dci].txqp,
UCS_ERR_CANCELED,
iface->tx.dcis[self->dci].txwq.sw_pi, 1);
ucs_assert(ucs_queue_is_empty(&iface->tx.dcis[self->dci].txqp.outstanding));
iface->tx.dcis[self->dci].ep = NULL;
}
Expand Down Expand Up @@ -1310,34 +1344,25 @@ ucs_status_t uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_
void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg,
ucs_status_t ep_status)
{
struct mlx5_cqe64 *cqe = arg;
uct_iface_h tl_iface = ep->super.super.iface;
uint8_t dci = ep->dci;
uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
uct_rc_txqp_t *txqp = &iface->tx.dcis[dci].txqp;
uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq;
uint16_t pi = ntohs(cqe->wqe_counter);

ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
uct_dc_mlx5_iface_reset_dci(iface, dci, ep_status);

/* since we removed all outstanding ops on the dci, it should be released */
uct_dc_mlx5_update_tx_res(iface, txwq, txqp, pi);
uct_dc_mlx5_txqp_purge_outstanding(iface, txqp, ep_status, pi, 0);

ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
uct_dc_mlx5_iface_dci_put(iface, dci);
ucs_assert_always(ep->dci == UCT_DC_MLX5_EP_NO_DCI);

if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
/* No need to wait for grant on this ep anymore */
uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
}
uct_dc_mlx5_iface_set_ep_failed(iface, ep, cqe, txwq, ep_status);

if (ep == iface->tx.fc_ep) {
ucs_assert(ep_status != UCS_ERR_CANCELED);
/* Cannot handle errors on flow-control endpoint.
* Or shall we ignore them?
*/
ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface,
ucs_status_string(ep_status));
} else {
uct_dc_mlx5_iface_set_ep_failed(iface, ep, (struct mlx5_cqe64*)arg,
txwq, ep_status);
if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
uct_dc_mlx5_iface_reset_dci(iface, dci);
}
}

Expand Down
23 changes: 15 additions & 8 deletions src/uct/ib/dc/dc_mlx5_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@


enum uct_dc_mlx5_ep_flags {
UCT_DC_MLX5_EP_FLAG_TX_WAIT = UCS_BIT(0), /* ep is in the tx_wait state. See
description of the dcs+quota dci
selection policy above */
UCT_DC_MLX5_EP_FLAG_GRH = UCS_BIT(1), /* ep has GRH address. Used by
dc_mlx5 endpoint */
UCT_DC_MLX5_EP_FLAG_VALID = UCS_BIT(2), /* ep is a valid endpoint */
UCT_DC_MLX5_EP_FLAG_TX_WAIT = UCS_BIT(0), /* ep is in the tx_wait state. See
description of the dcs+quota dci
selection policy above */
UCT_DC_MLX5_EP_FLAG_GRH = UCS_BIT(1), /* ep has GRH address. Used by
dc_mlx5 endpoint */
UCT_DC_MLX5_EP_FLAG_VALID = UCS_BIT(2), /* ep is a valid endpoint */
/* Indicates that FC grant has been requested, but is not received yet.
* Flush will not complete until an outgoing grant request is acked.
* It is needed to avoid the following cases:
* 1) Grant arrives for the recently deleted ep.
* 2) QP resources are available, but there are some pending requests. */
UCT_DC_MLX5_EP_FLAG_FC_WAIT_FOR_GRANT = UCS_BIT(3),
UCT_DC_MLX5_EP_FLAG_FC_WAIT_FOR_GRANT = UCS_BIT(3),

/* Keepalive Request scheduled: indicates that keepalive request
* is scheduled in outstanding queue and no more keepalive actions
* are needed */
UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4)
UCT_DC_MLX5_EP_FLAG_KEEPALIVE_POSTED = UCS_BIT(4),

/* Flush cancel was executed on EP */
UCT_DC_MLX5_EP_FLAG_FLUSH_CANCEL = UCS_BIT(5),

/* Error handler already called or flush(CANCEL) disabled it */
UCT_DC_MLX5_EP_FLAG_ERR_HANDLER_INVOKED = UCS_BIT(6),
};


Expand Down
4 changes: 2 additions & 2 deletions src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ uct_rc_mlx5_iface_handle_failure(uct_ib_iface_t *ib_iface, void *arg,
uct_rc_mlx5_common_update_tx_res(iface, &ep->tx.wq, &ep->super.txqp, pi);
uct_rc_txqp_purge_outstanding(iface, &ep->super.txqp, ep_status, pi, 0);

if (ep->super.flags & (UCT_RC_EP_FLAG_NO_ERR_HANDLER |
if (ep->super.flags & (UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED |
UCT_RC_EP_FLAG_FLUSH_CANCEL)) {
return;
}

ep->super.flags |= UCT_RC_EP_FLAG_NO_ERR_HANDLER;
ep->super.flags |= UCT_RC_EP_FLAG_ERR_HANDLER_INVOKED;

status = uct_iface_handle_ep_err(&iface->super.super.super,
&ep->super.super.super, ep_status);
Expand Down
Loading

0 comments on commit a58c494

Please sign in to comment.