From 41500a726b5412ea824d955ea386d9d51556b3c7 Mon Sep 17 00:00:00 2001 From: Sergey Oblomov Date: Wed, 7 Jul 2021 10:24:20 +0300 Subject: [PATCH] RC/KEEPALIVE: update for KA call - keepalive procedure is initiated from progress callback now --- src/uct/ib/rc/accel/rc_mlx5_common.c | 21 ----------- src/uct/ib/rc/accel/rc_mlx5_common.h | 6 +++- src/uct/ib/rc/accel/rc_mlx5_iface.c | 54 ++++++++++++++++++++++------ 3 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.c b/src/uct/ib/rc/accel/rc_mlx5_common.c index 8563087348d..80f59d53149 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.c +++ b/src/uct/ib/rc/accel/rc_mlx5_common.c @@ -79,23 +79,6 @@ ucs_config_field_t uct_rc_mlx5_common_config_table[] = { {NULL} }; -static void -uct_rc_mlx5_iface_common_send_keepalive(uct_rc_mlx5_iface_common_t *iface) -{ - uct_rc_mlx5_ep_t *ep; - - ucs_spin_lock(&iface->super.ep_list_lock); - ucs_list_for_each(ep, &iface->super.ep_list, super.list) { - ucs_trace("send keepalive grant on ep %p", ep); - uct_rc_ep_fc_send_grant(&ep->super); - } - ucs_spin_unlock(&iface->super.ep_list_lock); - - uct_rc_mlx5_iface_print(iface, "keepalive"); - - iface->ka_time = ucs_get_time() + iface->config.ka_interval; -} - unsigned uct_rc_mlx5_iface_srq_post_recv(uct_rc_mlx5_iface_common_t *iface) { uct_ib_mlx5_srq_t *srq = &iface->rx.srq; @@ -160,10 +143,6 @@ unsigned uct_rc_mlx5_iface_srq_post_recv(uct_rc_mlx5_iface_common_t *iface) ucs_assert(uct_ib_mlx5_srq_get_wqe(srq, srq->mask)->srq.next_wqe_index == 0); } - if (ucs_unlikely(ucs_get_time() > iface->ka_time)) { - uct_rc_mlx5_iface_common_send_keepalive(iface); - } - return count; } diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.h b/src/uct/ib/rc/accel/rc_mlx5_common.h index bc2f8e2a8fb..675fc5572d6 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.h +++ b/src/uct/ib/rc/accel/rc_mlx5_common.h @@ -371,7 +371,11 @@ typedef struct uct_rc_mlx5_iface_common { void *pref_ptr; } rx; uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM]; - ucs_time_t ka_time; + struct { + ucs_time_t time; + unsigned iter_count; + uct_worker_cb_id_t prog_id; + } keepalive; struct { uct_rc_mlx5_cmd_wq_t cmd_wq; uct_rc_mlx5_tag_entry_t *head; diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index 0cb3e9a7055..0cc1bf1db77 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -20,10 +20,13 @@ #include "rc_mlx5.inl" +#define UCP_WORKER_KEEPALIVE_ITER_SKIP 32 + + enum { UCT_RC_MLX5_IFACE_ADDR_TYPE_BASIC, - /* Tag Matching address. It additionaly contains QP number which + /* Tag Matching address. It additionally contains QP number which * is used for hardware offloads. */ UCT_RC_MLX5_IFACE_ADDR_TYPE_TM }; @@ -573,6 +576,33 @@ int uct_rc_mlx5_iface_is_reachable(const uct_iface_h tl_iface, return uct_ib_iface_is_reachable(tl_iface, dev_addr, iface_addr); } +static unsigned uct_rc_mlx5_common_ka_progress(void *arg) +{ + uct_rc_mlx5_iface_common_t *iface = arg; + uct_rc_mlx5_ep_t *ep; + + if ((iface->keepalive.iter_count++ % UCP_WORKER_KEEPALIVE_ITER_SKIP) != 0) { + return 0; + } + + if (ucs_unlikely(ucs_get_time() < iface->keepalive.time)) { + return 0; + } + + ucs_spin_lock(&iface->super.ep_list_lock); + ucs_list_for_each(ep, &iface->super.ep_list, super.list) { + ucs_trace("send keepalive grant on ep %p", ep); + uct_rc_ep_fc_send_grant(&ep->super); + } + ucs_spin_unlock(&iface->super.ep_list_lock); + + uct_rc_mlx5_iface_print(iface, "keepalive"); + + iface->keepalive.time = ucs_get_time() + iface->config.ka_interval; + + return 1; +} + UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_common_t, uct_rc_iface_ops_t *ops, uct_md_h md, uct_worker_h worker, @@ -598,16 +628,18 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_common_t, UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t, ops, md, worker, params, rc_config, init_attr); - dev = uct_ib_iface_device(&self->super.super); - self->tx.mmio_mode = mlx5_config->super.mmio_mode; - self->tx.bb_max = ucs_min(mlx5_config->tx_max_bb, UINT16_MAX); - self->tm.am_desc.super.cb = uct_rc_mlx5_release_desc; - if (mlx5_config->ka_interval < 1e-6) { - self->config.ka_interval = UCS_TIME_INFINITY; - self->ka_time = UCS_TIME_INFINITY; - } else { + dev = uct_ib_iface_device(&self->super.super); + self->tx.mmio_mode = mlx5_config->super.mmio_mode; + self->tx.bb_max = ucs_min(mlx5_config->tx_max_bb, UINT16_MAX); + self->tm.am_desc.super.cb = uct_rc_mlx5_release_desc; + self->keepalive.iter_count = 0; + self->keepalive.prog_id = UCS_CALLBACKQ_ID_NULL; + if (mlx5_config->ka_interval > 1e-6) { self->config.ka_interval = ucs_time_from_sec(mlx5_config->ka_interval); - self->ka_time = ucs_get_time() + self->config.ka_interval; + self->keepalive.time = ucs_get_time() + self->config.ka_interval; + uct_worker_progress_register_safe(worker, + uct_rc_mlx5_common_ka_progress, self, + 0, &self->keepalive.prog_id); } if (!UCT_RC_MLX5_MP_ENABLED(self)) { @@ -714,6 +746,8 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_iface_common_t) ucs_mpool_cleanup(&self->tx.atomic_desc_mp, 1); uct_rc_mlx5_iface_common_dm_cleanup(self); uct_rc_mlx5_iface_common_tag_cleanup(self); + uct_worker_progress_unregister_safe(&self->super.super.super.worker->super, + &self->keepalive.prog_id); UCS_STATS_NODE_FREE(self->stats); }