Skip to content

Commit

Permalink
UCT/RC: Deferred release of READ credits
Browse files Browse the repository at this point in the history
  • Loading branch information
brminich committed Sep 29, 2020
1 parent 4ec1bcf commit e1228f9
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 33 deletions.
9 changes: 6 additions & 3 deletions src/uct/ib/dc/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,18 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface)
ucs_trace_poll("dc iface %p tx_cqe: dci[%d] qpn 0x%x txqp %p hw_ci %d",
iface, dci, qp_num, txqp, hw_ci);

uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci);

uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci));
ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max);

uct_rc_iface_update_reads(&iface->super.super);

uct_dc_mlx5_iface_dci_put(iface, dci);
/* process pending elements prior to CQ entries to
* avoid out-of-order transmission in completion
* callbacks */
uct_dc_mlx5_iface_progress_pending(iface);
uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci);
return 1;
}

Expand Down Expand Up @@ -377,7 +380,7 @@ static ucs_status_t uct_dc_mlx5_iface_create_qp(uct_dc_mlx5_iface_t *iface,
return UCS_OK;

err:
uct_rc_txqp_cleanup(&dci->txqp);
uct_rc_txqp_cleanup(&iface->super.super, &dci->txqp);
err_qp:
ibv_destroy_qp(dci->txwq.super.verbs.qp);
return status;
Expand Down Expand Up @@ -767,7 +770,7 @@ void uct_dc_mlx5_iface_dcis_destroy(uct_dc_mlx5_iface_t *iface, int max)
{
int i;
for (i = 0; i < max; i++) {
uct_rc_txqp_cleanup(&iface->tx.dcis[i].txqp);
uct_rc_txqp_cleanup(&iface->super.super, &iface->tx.dcis[i].txqp);
ucs_assert(iface->tx.dcis[i].txwq.super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS);
uct_ib_destroy_qp(iface->tx.dcis[i].txwq.super.verbs.qp);
}
Expand Down
6 changes: 4 additions & 2 deletions src/uct/ib/dc/dc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)
ucs_debug("ep (%p) is destroyed with %d outstanding ops",
self, (int16_t)iface->super.super.config.tx_qp_len -
uct_rc_txqp_available(&iface->tx.dcis[self->dci].txqp));
uct_rc_txqp_purge_outstanding(&iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED, 1);
uct_rc_txqp_purge_outstanding(&iface->super.super,
&iface->tx.dcis[self->dci].txqp,
UCS_ERR_CANCELED, 1);
iface->tx.dcis[self->dci].ep = NULL;
}

Expand Down Expand Up @@ -1281,7 +1283,7 @@ void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg,

ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));

uct_rc_txqp_purge_outstanding(txqp, ep_status, 0);
uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, 0);

/* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble
is not updated for the error cqe and all outstanding wqes*/
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/dc/dc_mlx5_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ static inline struct mlx5_grh_av *uct_dc_mlx5_ep_get_grh(uct_dc_mlx5_ep_t *ep)
* available TX resources. */
#define UCT_DC_CHECK_RES_AND_FC(_iface, _ep) \
{ \
UCT_RC_CHECK_NUM_RDMA_READ(&(_iface)->super.super) \
if (ucs_unlikely((_ep)->fc.fc_wnd <= \
(_iface)->super.super.config.fc_hard_thresh)) { \
ucs_status_t _status = uct_dc_mlx5_ep_check_fc(_iface, _ep); \
Expand Down
2 changes: 2 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5.inl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ uct_rc_mlx5_common_update_tx_res(uct_rc_iface_t *rc_iface, uct_ib_mlx5_txwq_t *t
uct_rc_txqp_available_add(txqp, bb_num);
ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max);

uct_rc_iface_update_reads(rc_iface);

rc_iface->tx.cq_available += bb_num;
ucs_assertv(rc_iface->tx.cq_available <= rc_iface->config.tx_cq_len,
"cq_available=%d tx_cq_len=%d bb_num=%d txwq=%p txqp=%p",
Expand Down
3 changes: 2 additions & 1 deletion src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,8 @@ ucs_status_t uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t *ep,
uct_rc_mlx5_iface_common_t);
int iface_res_released;

iface_res_released = uct_rc_txqp_purge_outstanding(&ep->super.txqp, status, 0);
iface_res_released = uct_rc_txqp_purge_outstanding(&iface->super,
&ep->super.txqp, status, 0);

if (cqe != NULL) {
uct_rc_mlx5_common_update_tx_res(&iface->super, &ep->tx.wq,
Expand Down
3 changes: 2 additions & 1 deletion src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_common_t *iface)
ucs_trace_poll("rc_mlx5 iface %p tx_cqe: ep %p qpn 0x%x hw_ci %d", iface, ep,
qp_num, hw_ci);

uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci);

uct_rc_mlx5_common_update_tx_res(&iface->super, &ep->tx.wq, &ep->super.txqp,
hw_ci);

Expand All @@ -135,7 +137,6 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_common_t *iface)
ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending,
NULL);

uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci);
ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending,
NULL);

Expand Down
20 changes: 12 additions & 8 deletions src/uct/ib/rc/base/rc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface,
stats_parent, "-0x%x", qp_num);
}

void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp)
void uct_rc_txqp_cleanup(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp)
{
uct_rc_txqp_purge_outstanding(txqp, UCS_ERR_CANCELED, 1);
uct_rc_txqp_purge_outstanding(iface, txqp, UCS_ERR_CANCELED, 1);
UCS_STATS_NODE_FREE(txqp->stats);
}

Expand Down Expand Up @@ -123,18 +123,20 @@ UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface, uint32_t qp_num,
return UCS_OK;

err_txqp_cleanup:
uct_rc_txqp_cleanup(&self->txqp);
uct_rc_txqp_cleanup(iface, &self->txqp);
return status;
}

static UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)
{
uct_rc_iface_t *iface = ucs_derived_of(self->super.super.iface,
uct_rc_iface_t);
ucs_debug("destroy rc ep %p", self);

ucs_list_del(&self->list);
uct_rc_ep_pending_purge(&self->super.super, NULL, NULL);
uct_rc_fc_cleanup(&self->fc);
uct_rc_txqp_cleanup(&self->txqp);
uct_rc_txqp_cleanup(iface, &self->txqp);
}

UCS_CLASS_DEFINE(uct_rc_ep_t, uct_base_ep_t)
Expand Down Expand Up @@ -169,13 +171,13 @@ uct_rc_op_release_iface_resources(uct_rc_iface_send_op_t *op, int is_get_zcopy)
uct_rc_iface_t *iface;

if (is_get_zcopy) {
op->iface->tx.reads_available += op->length;
op->iface->tx.reads_completed += op->length;
return;
}

desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
iface = ucs_container_of(ucs_mpool_obj_owner(desc), uct_rc_iface_t, tx.mp);
iface->tx.reads_available += op->length;
iface->tx.reads_completed += op->length;
}

void uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
Expand Down Expand Up @@ -337,8 +339,8 @@ ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self)
return status;
}

int uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
int is_log)
int uct_rc_txqp_purge_outstanding(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
ucs_status_t status, int is_log)
{
uct_rc_iface_send_op_t *op;
uct_rc_iface_send_desc_t *desc;
Expand All @@ -365,9 +367,11 @@ int uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
if ((op->handler == uct_rc_ep_get_bcopy_handler) ||
(op->handler == uct_rc_ep_get_bcopy_handler_no_completion)) {
uct_rc_op_release_iface_resources(op, 0);
uct_rc_iface_update_reads(iface);
iface_resources_released = 1;
} else if (op->handler == uct_rc_ep_get_zcopy_completion_handler) {
uct_rc_op_release_iface_resources(op, 1);
uct_rc_iface_update_reads(iface);
iface_resources_released = 1;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ void uct_rc_fc_cleanup(uct_rc_fc_t *fc);

ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self);

int uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
int is_log);
int uct_rc_txqp_purge_outstanding(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
ucs_status_t status, int is_log);

ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available,
unsigned flags);
Expand All @@ -267,7 +267,7 @@ void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(64, 1)(uct_rc_iface_send_op_t *op,
ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface,
uint32_t qp_num
UCS_STATS_ARG(ucs_stats_node_t* stats_parent));
void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp);
void uct_rc_txqp_cleanup(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp);

static inline int16_t uct_rc_txqp_available(uct_rc_txqp_t *txqp)
{
Expand Down
3 changes: 2 additions & 1 deletion src/uct/ib/rc/base/rc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md,
self->tx.reads_available = config->tx.max_get_bytes;
}

self->tx.arb_cbq_id = UCS_CALLBACKQ_ID_NULL;
self->tx.arb_cbq_id = UCS_CALLBACKQ_ID_NULL;
self->tx.reads_completed = 0;

uct_ib_fence_info_init(&self->tx.fi);
uct_rc_iface_set_path_mtu(self, config);
Expand Down
10 changes: 10 additions & 0 deletions src/uct/ib/rc/base/rc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ struct uct_rc_iface {
* credit */
signed cq_available;
ssize_t reads_available;
ssize_t reads_completed;
uct_rc_iface_send_op_t *free_ops; /* stack of free send operations */
ucs_arbiter_t arbiter;
uct_rc_iface_send_op_t *ops_buffer;
Expand Down Expand Up @@ -393,6 +394,15 @@ uct_rc_iface_have_tx_cqe_avail(uct_rc_iface_t* iface)
return iface->tx.cq_available > 0;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_iface_update_reads(uct_rc_iface_t *iface)
{
ucs_assert(iface->tx.reads_completed >= 0);

iface->tx.reads_available += iface->tx.reads_completed;
iface->tx.reads_completed = 0;
}

static UCS_F_ALWAYS_INLINE uct_rc_iface_send_op_t*
uct_rc_iface_get_send_op(uct_rc_iface_t *iface)
{
Expand Down
8 changes: 4 additions & 4 deletions src/uct/ib/rc/verbs/rc_verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ ucs_status_t uct_rc_verbs_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t);

UCT_RC_CHECK_AM_SHORT(id, length, iface->config.max_inline);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
uct_rc_verbs_iface_fill_inl_am_sge(iface, id, hdr, buffer, length);
UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(hdr) + length);
Expand All @@ -276,7 +276,7 @@ ssize_t uct_rc_verbs_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id,

UCT_CHECK_AM_ID(id);

UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(&iface->super, &iface->super.tx.mp, desc,
id, uct_rc_am_hdr_fill, uct_rc_hdr_t,
Expand Down Expand Up @@ -309,7 +309,7 @@ ucs_status_t uct_rc_verbs_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *he
UCT_RC_CHECK_AM_ZCOPY(id, header_length, uct_iov_total_length(iov, iovcnt),
iface->config.short_desc_size,
iface->super.super.config.seg_size);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);

UCT_RC_IFACE_GET_TX_AM_ZCOPY_DESC(&iface->super, &iface->short_desc_mp,
Expand Down Expand Up @@ -461,7 +461,7 @@ ucs_status_t uct_rc_verbs_ep_handle_failure(uct_rc_verbs_ep_t *ep,
iface->tx.cq_available += ep->txcnt.pi - ep->txcnt.ci;
/* Reset CI to prevent cq_available overrun on ep_destroy */
ep->txcnt.ci = ep->txcnt.pi;
uct_rc_txqp_purge_outstanding(&ep->super.txqp, status, 0);
uct_rc_txqp_purge_outstanding(iface, &ep->super.txqp, status, 0);

return iface->super.ops->set_ep_failed(&iface->super, &ep->super.super.super,
status);
Expand Down
6 changes: 5 additions & 1 deletion src/uct/ib/rc/verbs/rc_verbs_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,21 @@ uct_rc_verbs_iface_poll_tx(uct_rc_verbs_iface_t *iface)
count = uct_rc_verbs_txcq_get_comp_count(&wc[i], &ep->super.txqp);
ucs_trace_poll("rc_verbs iface %p tx_wc wrid 0x%lx ep %p qpn 0x%x count %d",
iface, wc[i].wr_id, ep, wc[i].qp_num, count);

uct_rc_txqp_completion_desc(&ep->super.txqp, ep->txcnt.ci);

uct_rc_verbs_txqp_completed(&ep->super.txqp, &ep->txcnt, count);
iface->super.tx.cq_available += count;

uct_rc_iface_update_reads(&iface->super);

/* process pending elements prior to CQ entries to avoid out-of-order
* transmission in completion callbacks */
ucs_arbiter_group_schedule(&iface->super.tx.arbiter,
&ep->super.arb_group);
ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1,
uct_rc_ep_process_pending, NULL);

uct_rc_txqp_completion_desc(&ep->super.txqp, ep->txcnt.ci);
}

return num_wcs;
Expand Down
Loading

0 comments on commit e1228f9

Please sign in to comment.