diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index e784ab931cc4..cb0979b39ba0 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -126,13 +126,13 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_common_t *iface) } hw_ci = ntohs(cqe->wqe_counter); - ucs_trace_poll("rc_mlx5 iface %p tx_cqe: ep %p qpn 0x%x hw_ci %d", iface, ep, - qp_num, hw_ci); uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci); uct_rc_mlx5_common_update_tx_res(&iface->super, &ep->tx.wq, &ep->super.txqp, hw_ci); + ucs_trace_data("rc_mlx5 iface %p tx_cqe: ep %p qpn 0x%x hw_ci %d reads_avail %zd", + iface, ep, qp_num, hw_ci, iface->super.tx.reads_available); /* process pending elements prior to CQ entries to avoid out-of-order * transmission in completion callbacks */ @@ -171,8 +171,13 @@ uct_rc_mlx5_common_ka_progress(uct_rc_mlx5_iface_common_t *iface) return 0; } + ucs_spin_lock(&iface->super.ep_list_lock); ucs_list_for_each(ep, &iface->super.ep_list, super.list) { + if (ep->super.txqp.available < ep->tx.wq.bb_max) { + /* have outstanding operations */ + continue; + } ucs_trace("send keepalive grant on ep %p", ep); uct_rc_ep_fc_send_grant(&ep->super); } diff --git a/test/apps/iodemo/ucx_wrapper.cc b/test/apps/iodemo/ucx_wrapper.cc index a3f0e00933de..4168434bddda 100644 --- a/test/apps/iodemo/ucx_wrapper.cc +++ b/test/apps/iodemo/ucx_wrapper.cc @@ -355,10 +355,21 @@ int UcxContext::is_timeout_elapsed(struct timeval const *tv_prior, double timeou void UcxContext::progress_timed_out_conns() { + if (_conns_in_progress.empty()) { + return; + } + + double time = get_time(); while (!_conns_in_progress.empty() && - (get_time() > _conns_in_progress.begin()->first)) { + (time > _conns_in_progress.begin()->first)) { UcxConnection *conn = _conns_in_progress.begin()->second; - conn->handle_connection_error(UCS_ERR_TIMED_OUT); + if (0) { + conn->handle_connection_error(UCS_ERR_TIMED_OUT); + } else { + conn->print_addresses(); + UCX_LOG << "conn " << conn->get_log_prefix() << " timed out"; + _conns_in_progress.begin()->first= time + 10.0; + } } } @@ -977,10 +988,11 @@ void UcxConnection::established(ucs_status_t status) { if (status == UCS_OK) { assert(_remote_conn_id != 0); - UCX_CONN_LOG << "Remote id is " << _remote_conn_id; + UCX_CONN_LOG << "remote id is " << _remote_conn_id; } _ucx_status = status; + print_addresses(); _context.remove_connection_inprogress(this); invoke_callback(_establish_cb, status); } diff --git a/test/apps/iodemo/ucx_wrapper.h b/test/apps/iodemo/ucx_wrapper.h index 0f06702b0bb0..f16c9a564fdb 100644 --- a/test/apps/iodemo/ucx_wrapper.h +++ b/test/apps/iodemo/ucx_wrapper.h @@ -309,6 +309,8 @@ class UcxConnection { return _num_instances; } + void print_addresses(); + private: static ucp_tag_t make_data_tag(uint32_t conn_id, uint32_t sn); @@ -328,8 +330,6 @@ class UcxConnection { void set_log_prefix(const struct sockaddr* saddr, socklen_t addrlen); - void print_addresses(); - void connect_common(ucp_ep_params_t &ep_params, UcxCallback *callback); void connect_tag(UcxCallback *callback);