Skip to content

Commit

Permalink
RC/KEEPALIVE: update for KA call
Browse files Browse the repository at this point in the history
- keepalive procedure is initiated from progress callback now
  • Loading branch information
Sergey Oblomov committed Jul 7, 2021
1 parent 09ec828 commit 41500a7
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 32 deletions.
21 changes: 0 additions & 21 deletions src/uct/ib/rc/accel/rc_mlx5_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,6 @@ ucs_config_field_t uct_rc_mlx5_common_config_table[] = {
{NULL}
};

static void
uct_rc_mlx5_iface_common_send_keepalive(uct_rc_mlx5_iface_common_t *iface)
{
uct_rc_mlx5_ep_t *ep;

ucs_spin_lock(&iface->super.ep_list_lock);
ucs_list_for_each(ep, &iface->super.ep_list, super.list) {
ucs_trace("send keepalive grant on ep %p", ep);
uct_rc_ep_fc_send_grant(&ep->super);
}
ucs_spin_unlock(&iface->super.ep_list_lock);

uct_rc_mlx5_iface_print(iface, "keepalive");

iface->ka_time = ucs_get_time() + iface->config.ka_interval;
}

unsigned uct_rc_mlx5_iface_srq_post_recv(uct_rc_mlx5_iface_common_t *iface)
{
uct_ib_mlx5_srq_t *srq = &iface->rx.srq;
Expand Down Expand Up @@ -160,10 +143,6 @@ unsigned uct_rc_mlx5_iface_srq_post_recv(uct_rc_mlx5_iface_common_t *iface)
ucs_assert(uct_ib_mlx5_srq_get_wqe(srq, srq->mask)->srq.next_wqe_index == 0);
}

if (ucs_unlikely(ucs_get_time() > iface->ka_time)) {
uct_rc_mlx5_iface_common_send_keepalive(iface);
}

return count;
}

Expand Down
6 changes: 5 additions & 1 deletion src/uct/ib/rc/accel/rc_mlx5_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ typedef struct uct_rc_mlx5_iface_common {
void *pref_ptr;
} rx;
uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM];
ucs_time_t ka_time;
struct {
ucs_time_t time;
unsigned iter_count;
uct_worker_cb_id_t prog_id;
} keepalive;
struct {
uct_rc_mlx5_cmd_wq_t cmd_wq;
uct_rc_mlx5_tag_entry_t *head;
Expand Down
54 changes: 44 additions & 10 deletions src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
#include "rc_mlx5.inl"


#define UCP_WORKER_KEEPALIVE_ITER_SKIP 32


enum {
UCT_RC_MLX5_IFACE_ADDR_TYPE_BASIC,

/* Tag Matching address. It additionaly contains QP number which
/* Tag Matching address. It additionally contains QP number which
* is used for hardware offloads. */
UCT_RC_MLX5_IFACE_ADDR_TYPE_TM
};
Expand Down Expand Up @@ -573,6 +576,33 @@ int uct_rc_mlx5_iface_is_reachable(const uct_iface_h tl_iface,
return uct_ib_iface_is_reachable(tl_iface, dev_addr, iface_addr);
}

static unsigned uct_rc_mlx5_common_ka_progress(void *arg)
{
uct_rc_mlx5_iface_common_t *iface = arg;
uct_rc_mlx5_ep_t *ep;

if ((iface->keepalive.iter_count++ % UCP_WORKER_KEEPALIVE_ITER_SKIP) != 0) {
return 0;
}

if (ucs_unlikely(ucs_get_time() < iface->keepalive.time)) {
return 0;
}

ucs_spin_lock(&iface->super.ep_list_lock);
ucs_list_for_each(ep, &iface->super.ep_list, super.list) {
ucs_trace("send keepalive grant on ep %p", ep);
uct_rc_ep_fc_send_grant(&ep->super);
}
ucs_spin_unlock(&iface->super.ep_list_lock);

uct_rc_mlx5_iface_print(iface, "keepalive");

iface->keepalive.time = ucs_get_time() + iface->config.ka_interval;

return 1;
}

UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_common_t,
uct_rc_iface_ops_t *ops,
uct_md_h md, uct_worker_h worker,
Expand All @@ -598,16 +628,18 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_common_t,
UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t, ops, md, worker, params,
rc_config, init_attr);

dev = uct_ib_iface_device(&self->super.super);
self->tx.mmio_mode = mlx5_config->super.mmio_mode;
self->tx.bb_max = ucs_min(mlx5_config->tx_max_bb, UINT16_MAX);
self->tm.am_desc.super.cb = uct_rc_mlx5_release_desc;
if (mlx5_config->ka_interval < 1e-6) {
self->config.ka_interval = UCS_TIME_INFINITY;
self->ka_time = UCS_TIME_INFINITY;
} else {
dev = uct_ib_iface_device(&self->super.super);
self->tx.mmio_mode = mlx5_config->super.mmio_mode;
self->tx.bb_max = ucs_min(mlx5_config->tx_max_bb, UINT16_MAX);
self->tm.am_desc.super.cb = uct_rc_mlx5_release_desc;
self->keepalive.iter_count = 0;
self->keepalive.prog_id = UCS_CALLBACKQ_ID_NULL;
if (mlx5_config->ka_interval > 1e-6) {
self->config.ka_interval = ucs_time_from_sec(mlx5_config->ka_interval);
self->ka_time = ucs_get_time() + self->config.ka_interval;
self->keepalive.time = ucs_get_time() + self->config.ka_interval;
uct_worker_progress_register_safe(worker,
uct_rc_mlx5_common_ka_progress, self,
0, &self->keepalive.prog_id);
}

if (!UCT_RC_MLX5_MP_ENABLED(self)) {
Expand Down Expand Up @@ -714,6 +746,8 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_iface_common_t)
ucs_mpool_cleanup(&self->tx.atomic_desc_mp, 1);
uct_rc_mlx5_iface_common_dm_cleanup(self);
uct_rc_mlx5_iface_common_tag_cleanup(self);
uct_worker_progress_unregister_safe(&self->super.super.super.worker->super,
&self->keepalive.prog_id);
UCS_STATS_NODE_FREE(self->stats);
}

Expand Down

0 comments on commit 41500a7

Please sign in to comment.