Skip to content

Commit

Permalink
RC/MLX5: Don't send keepalive if already have outstanding operations
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe committed Aug 29, 2021
1 parent 6514de6 commit 0b5c09c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
9 changes: 7 additions & 2 deletions src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_common_t *iface)
}

hw_ci = ntohs(cqe->wqe_counter);
ucs_trace_poll("rc_mlx5 iface %p tx_cqe: ep %p qpn 0x%x hw_ci %d", iface, ep,
qp_num, hw_ci);

uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci);

uct_rc_mlx5_common_update_tx_res(&iface->super, &ep->tx.wq, &ep->super.txqp,
hw_ci);
ucs_trace_data("rc_mlx5 iface %p tx_cqe: ep %p qpn 0x%x hw_ci %d reads_avail %zd",
iface, ep, qp_num, hw_ci, iface->super.tx.reads_available);

/* process pending elements prior to CQ entries to avoid out-of-order
* transmission in completion callbacks */
Expand Down Expand Up @@ -171,8 +171,13 @@ uct_rc_mlx5_common_ka_progress(uct_rc_mlx5_iface_common_t *iface)
return 0;
}


ucs_spin_lock(&iface->super.ep_list_lock);
ucs_list_for_each(ep, &iface->super.ep_list, super.list) {
if (ep->super.txqp.available < ep->tx.wq.bb_max) {
/* have outstanding operations */
continue;
}
ucs_trace("send keepalive grant on ep %p", ep);
uct_rc_ep_fc_send_grant(&ep->super);
}
Expand Down
18 changes: 15 additions & 3 deletions test/apps/iodemo/ucx_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,21 @@ int UcxContext::is_timeout_elapsed(struct timeval const *tv_prior, double timeou

void UcxContext::progress_timed_out_conns()
{
if (_conns_in_progress.empty()) {
return;
}

double time = get_time();
while (!_conns_in_progress.empty() &&
(get_time() > _conns_in_progress.begin()->first)) {
(time > _conns_in_progress.begin()->first)) {
UcxConnection *conn = _conns_in_progress.begin()->second;
conn->handle_connection_error(UCS_ERR_TIMED_OUT);
if (0) {
conn->handle_connection_error(UCS_ERR_TIMED_OUT);
} else {
conn->print_addresses();
UCX_LOG << "conn " << conn->get_log_prefix() << " timed out";
_conns_in_progress.begin()->first= time + 10.0;
}
}
}

Expand Down Expand Up @@ -977,10 +988,11 @@ void UcxConnection::established(ucs_status_t status)
{
if (status == UCS_OK) {
assert(_remote_conn_id != 0);
UCX_CONN_LOG << "Remote id is " << _remote_conn_id;
UCX_CONN_LOG << "remote id is " << _remote_conn_id;
}

_ucx_status = status;
print_addresses();
_context.remove_connection_inprogress(this);
invoke_callback(_establish_cb, status);
}
Expand Down
4 changes: 2 additions & 2 deletions test/apps/iodemo/ucx_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ class UcxConnection {
return _num_instances;
}

void print_addresses();

private:
static ucp_tag_t make_data_tag(uint32_t conn_id, uint32_t sn);

Expand All @@ -328,8 +330,6 @@ class UcxConnection {

void set_log_prefix(const struct sockaddr* saddr, socklen_t addrlen);

void print_addresses();

void connect_common(ucp_ep_params_t &ep_params, UcxCallback *callback);

void connect_tag(UcxCallback *callback);
Expand Down

0 comments on commit 0b5c09c

Please sign in to comment.