diff --git a/src/tools/perf/libperf.c b/src/tools/perf/libperf.c index f3c2ebb08bb..c8d968327a8 100644 --- a/src/tools/perf/libperf.c +++ b/src/tools/perf/libperf.c @@ -1173,6 +1173,9 @@ static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf, ucx_perf_params_t * goto out_free_mem; } + uct_iface_progress_enable(perf->uct.iface, + UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); + return UCS_OK; out_free_mem: diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index a10dd627e38..298f55d7279 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -978,7 +978,7 @@ void ucp_context_tag_offload_enable(ucp_context_h context) offload_iface = ucs_queue_head_elem_non_empty(&context->tm.offload.ifaces, ucp_worker_iface_t, queue); - ucp_worker_iface_activate(offload_iface); + ucp_worker_iface_activate(offload_iface, 0); ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu", context->tm.offload.thresh, context->tm.offload.zcopy_thresh); diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index e8e2ad34107..70c1e27e7ad 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -381,7 +381,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) } } -void ucp_worker_iface_activate(ucp_worker_iface_t *wiface) +void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags) { ucp_worker_h worker = wiface->worker; ucs_status_t status; @@ -408,7 +408,7 @@ void ucp_worker_iface_activate(ucp_worker_iface_t *wiface) } uct_iface_progress_enable(wiface->iface, - UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); + UCT_PROGRESS_SEND | UCT_PROGRESS_RECV | uct_flags); } static void ucp_worker_iface_deactivate(ucp_worker_iface_t *wiface, int force) @@ -451,7 +451,12 @@ void ucp_worker_iface_progress_ep(ucp_worker_iface_t *wiface) ucs_trace_func("iface=%p", wiface->iface); UCS_ASYNC_BLOCK(&wiface->worker->async); - ucp_worker_iface_activate(wiface); + + /* This function may be called from progress thread (such as when processing + * wireup messages), so ask UCT to be thread-safe. + */ + ucp_worker_iface_activate(wiface, UCT_PROGRESS_THREAD_SAFE); + UCS_ASYNC_UNBLOCK(&wiface->worker->async); } @@ -488,7 +493,7 @@ static ucs_status_t ucp_worker_iface_check_events_do(ucp_worker_iface_t *wiface, *progress_count = uct_iface_progress(wiface->iface); if (prev_am_count != wiface->proxy_am_count) { /* Received relevant active messages, activate the interface */ - ucp_worker_iface_activate(wiface); + ucp_worker_iface_activate(wiface, 0); return UCS_OK; } else if (*progress_count == 0) { /* Arm the interface to wait for next event */ @@ -682,10 +687,6 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id, tl_id, wiface->iface, UCT_TL_RESOURCE_DESC_ARG(&resource->tl_rsc), worker); - /* Disable progress until we know better */ - uct_iface_progress_disable(wiface->iface, UCT_PROGRESS_SEND | - UCT_PROGRESS_RECV); - VALGRIND_MAKE_MEM_UNDEFINED(&wiface->attr, sizeof(wiface->attr)); status = uct_iface_query(wiface->iface, &wiface->attr); if (status != UCS_OK) { @@ -725,7 +726,7 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id, { ucp_worker_iface_deactivate(wiface, 1); } else { - ucp_worker_iface_activate(wiface); + ucp_worker_iface_activate(wiface, 0); } } diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 916755910a5..dba1dca46ca 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -169,7 +169,7 @@ void ucp_worker_iface_unprogress_ep(ucp_worker_iface_t *wiface); void ucp_worker_signal_internal(ucp_worker_h worker); -void ucp_worker_iface_activate(ucp_worker_iface_t *wiface); +void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags); static inline const char* ucp_worker_get_name(ucp_worker_h worker) { diff --git a/src/uct/api/uct.h b/src/uct/api/uct.h index 131fcd13715..4c747a5497b 100644 --- a/src/uct/api/uct.h +++ b/src/uct/api/uct.h @@ -290,8 +290,11 @@ enum uct_flush_flags { * @brief UCT progress types */ enum uct_progress_types { - UCT_PROGRESS_SEND = UCS_BIT(0), /**< Progress send operations */ - UCT_PROGRESS_RECV = UCS_BIT(1) /**< Progress receive operations */ + UCT_PROGRESS_SEND = UCS_BIT(0), /**< Progress send operations */ + UCT_PROGRESS_RECV = UCS_BIT(1), /**< Progress receive operations */ + UCT_PROGRESS_THREAD_SAFE = UCS_BIT(7) /**< Enable/disable progress while + another thread may be calling + @ref ucp_worker_progress(). */ }; @@ -2306,11 +2309,15 @@ UCT_INLINE_API ucs_status_t uct_iface_tag_recv_cancel(uct_iface_h iface, * Notify the transport that it should actively progress communications during * @ref uct_worker_progress(). * - * When the interface is created, its progress is enabled. + * When the interface is created, its progress is initially disabled. * * @param [in] iface The interface to enable progress. * @param [in] flags The type of progress to enable as defined by - * @ref uct_progress_types. + * @ref uct_progress_types + * + * @note This function is not thread safe with respect to + * @ref ucp_worker_progress(), unless the flag + * @ref UCT_PROGRESS_THREAD_SAFE is specified. * */ UCT_INLINE_API void uct_iface_progress_enable(uct_iface_h iface, unsigned flags) @@ -2327,12 +2334,16 @@ UCT_INLINE_API void uct_iface_progress_enable(uct_iface_h iface, unsigned flags) * @ref uct_worker_progress(). Thus the latency of other transports may be * improved. * - * By default, progress is enabled when the interface is created. + * By default, progress is disabled when the interface is created. * * @param [in] iface The interface to disable progress. * @param [in] flags The type of progress to disable as defined by * @ref uct_progress_types. * + * @note This function is not thread safe with respect to + * @ref ucp_worker_progress(), unless the flag + * @ref UCT_PROGRESS_THREAD_SAFE is specified. + * */ UCT_INLINE_API void uct_iface_progress_disable(uct_iface_h iface, unsigned flags) { diff --git a/src/uct/base/uct_iface.c b/src/uct/base/uct_iface.c index 8ff721b9ce6..fe794255c2c 100644 --- a/src/uct/base/uct_iface.c +++ b/src/uct/base/uct_iface.c @@ -185,15 +185,31 @@ void uct_iface_close(uct_iface_h iface) void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags) { uct_base_iface_t *iface = ucs_derived_of(tl_iface, uct_base_iface_t); + uct_base_iface_progress_enable_cb(iface, + (ucs_callback_t)iface->super.ops.iface_progress, + flags); +} + +void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface, + ucs_callback_t cb, unsigned flags) +{ uct_priv_worker_t *worker = iface->worker; + unsigned thread_safe; UCS_ASYNC_BLOCK(worker->async); + thread_safe = flags & UCT_PROGRESS_THREAD_SAFE; + flags &= ~UCT_PROGRESS_THREAD_SAFE; + /* Add callback only if previous flags are 0 and new flags != 0 */ - if (!iface->progress_flags && flags) { - if (iface->prog.id == UCS_CALLBACKQ_ID_NULL) { - iface->prog.id = ucs_callbackq_add(&worker->super.progress_q, - (ucs_callback_t)iface->super.ops.iface_progress, + if ((!iface->progress_flags && flags) && + (iface->prog.id == UCS_CALLBACKQ_ID_NULL)) { + if (thread_safe) { + iface->prog.id = ucs_callbackq_add_safe(&worker->super.progress_q, + cb, iface, + UCS_CALLBACKQ_FLAG_FAST); + } else { + iface->prog.id = ucs_callbackq_add(&worker->super.progress_q, cb, iface, UCS_CALLBACKQ_FLAG_FAST); } } @@ -206,17 +222,24 @@ void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags) { uct_base_iface_t *iface = ucs_derived_of(tl_iface, uct_base_iface_t); uct_priv_worker_t *worker = iface->worker; + unsigned thread_safe; UCS_ASYNC_BLOCK(worker->async); + thread_safe = flags & UCT_PROGRESS_THREAD_SAFE; + flags &= ~UCT_PROGRESS_THREAD_SAFE; + /* Remove callback only if previous flags != 0, and removing the given * flags makes it become 0. */ - if (iface->progress_flags && !(iface->progress_flags & ~flags)) { - if (iface->prog.id != UCS_CALLBACKQ_ID_NULL) { + if ((iface->progress_flags && !(iface->progress_flags & ~flags)) && + (iface->prog.id != UCS_CALLBACKQ_ID_NULL)) { + if (thread_safe) { + ucs_callbackq_remove_safe(&worker->super.progress_q, iface->prog.id); + } else { ucs_callbackq_remove(&worker->super.progress_q, iface->prog.id); - iface->prog.id = UCS_CALLBACKQ_ID_NULL; } + iface->prog.id = UCS_CALLBACKQ_ID_NULL; } iface->progress_flags &= ~flags; diff --git a/src/uct/base/uct_iface.h b/src/uct/base/uct_iface.h index 12abee28d0e..b52ad540b7a 100644 --- a/src/uct/base/uct_iface.h +++ b/src/uct/base/uct_iface.h @@ -481,6 +481,9 @@ ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags); void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags); +void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface, + ucs_callback_t cb, unsigned flags); + void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags); ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags, diff --git a/src/uct/ib/dc/accel/dc_mlx5.c b/src/uct/ib/dc/accel/dc_mlx5.c index e7f390cd8cd..60a85a19f00 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.c +++ b/src/uct/ib/dc/accel/dc_mlx5.c @@ -801,9 +801,6 @@ static UCS_CLASS_INIT_FUNC(uct_dc_mlx5_iface_t, uct_md_h md, uct_worker_h worker UCT_IB_MLX5_AV_FULL_SIZE) / sizeof(struct mlx5_wqe_data_seg)); - /* TODO: only register progress when we have a connection */ - uct_base_iface_progress_enable(&self->super.super.super.super.super, - UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); ucs_debug("created dc iface %p", self); return UCS_OK; diff --git a/src/uct/ib/dc/verbs/dc_verbs.c b/src/uct/ib/dc/verbs/dc_verbs.c index 882769178a9..3308f847e00 100644 --- a/src/uct/ib/dc/verbs/dc_verbs.c +++ b/src/uct/ib/dc/verbs/dc_verbs.c @@ -767,9 +767,9 @@ uct_dc_verbs_poll_tx(uct_dc_verbs_iface_t *iface) return num_wcs; } -static unsigned uct_dc_verbs_iface_progress(uct_iface_h tl_iface) +static unsigned uct_dc_verbs_iface_progress(void *arg) { - uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t); + uct_dc_verbs_iface_t *iface = arg; unsigned count; count = uct_rc_verbs_iface_poll_rx_common(&iface->super.super); @@ -978,9 +978,9 @@ static ucs_status_t uct_dc_verbs_iface_tag_recv_cancel(uct_iface_h tl_iface, ctx, force); } -static unsigned uct_dc_verbs_iface_progress_tm(uct_iface_h tl_iface) +static unsigned uct_dc_verbs_iface_progress_tm(void *arg) { - uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t); + uct_dc_verbs_iface_t *iface = arg; unsigned count; count = uct_rc_verbs_iface_poll_rx_tm(&iface->verbs_common, @@ -997,9 +997,8 @@ static ucs_status_t uct_dc_verbs_iface_tag_init(uct_dc_verbs_iface_t *iface, uct_dc_verbs_iface_config_t *config) { - uct_iface_t *tl_iface = &iface->super.super.super.super.super; - #if IBV_EXP_HW_TM_DC + if (UCT_RC_VERBS_TM_ENABLED(&iface->verbs_common)) { struct ibv_exp_create_srq_attr srq_init_attr = {}; struct ibv_exp_srq_dc_offload_params dc_op = {}; @@ -1032,13 +1031,13 @@ uct_dc_verbs_iface_tag_init(uct_dc_verbs_iface_t *iface, return status; } - - tl_iface->ops.iface_progress = uct_dc_verbs_iface_progress_tm; - } + iface->verbs_common.progress = uct_dc_verbs_iface_progress_tm; + } else #endif + { + iface->verbs_common.progress = uct_dc_verbs_iface_progress; + } - uct_base_iface_progress_enable(tl_iface, - UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); return UCS_OK; } @@ -1096,6 +1095,19 @@ uct_dc_verbs_iface_event_arm(uct_iface_h tl_iface, unsigned events) UCT_RC_VERBS_TM_ENABLED(&iface->verbs_common)); } +static void uct_dc_verbs_iface_progress_enable(uct_iface_h tl_iface, unsigned flags) +{ + uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t); + uct_rc_verbs_iface_common_progress_enable(&iface->verbs_common, + &iface->super.super, flags); +} + +static unsigned uct_dc_verbs_iface_do_progress(uct_iface_h tl_iface) +{ + uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t); + return iface->verbs_common.progress(iface); +} + static void UCS_CLASS_DELETE_FUNC_NAME(uct_dc_verbs_iface_t)(uct_iface_t*); static uct_dc_iface_ops_t uct_dc_verbs_iface_ops = { @@ -1136,9 +1148,9 @@ static uct_dc_iface_ops_t uct_dc_verbs_iface_ops = { #endif .iface_flush = uct_dc_iface_flush, .iface_fence = uct_base_iface_fence, - .iface_progress_enable = uct_base_iface_progress_enable, + .iface_progress_enable = uct_dc_verbs_iface_progress_enable, .iface_progress_disable = uct_base_iface_progress_disable, - .iface_progress = uct_dc_verbs_iface_progress, + .iface_progress = uct_dc_verbs_iface_do_progress, .iface_event_fd_get = uct_ib_iface_event_fd_get, .iface_event_arm = uct_dc_verbs_iface_event_arm, .iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_dc_verbs_iface_t), diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index 719f314e5d2..28e142acb3e 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -443,10 +443,6 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, uct_iface_h tl_iface) self->qp_num = self->super.txqp.qp->qp_num; self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max); uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max); - - uct_worker_progress_add_safe(iface->super.super.super.worker, - uct_rc_mlx5_iface_progress, iface, - &iface->super.super.super.prog); return UCS_OK; } @@ -455,8 +451,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t) uct_rc_mlx5_iface_t *iface = ucs_derived_of(self->super.super.super.iface, uct_rc_mlx5_iface_t); - uct_worker_progress_remove(iface->super.super.super.worker, - &iface->super.super.super.prog); uct_ib_mlx5_txwq_cleanup(&self->tx.wq); /* Modify QP to error to make HW generate CQEs for all in-progress SRQ diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index 162110a8ef1..d47962ed9e4 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -174,6 +174,8 @@ static UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_t, uct_md_h md, uct_worker_h worker static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_iface_t) { + uct_base_iface_progress_disable(&self->super.super.super.super, + UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common); } @@ -213,8 +215,8 @@ static uct_rc_iface_ops_t uct_rc_mlx5_iface_ops = { .ep_connect_to_ep = uct_rc_ep_connect_to_ep, .iface_flush = uct_rc_iface_flush, .iface_fence = uct_base_iface_fence, - .iface_progress_enable = ucs_empty_function, - .iface_progress_disable = ucs_empty_function, + .iface_progress_enable = uct_base_iface_progress_enable, + .iface_progress_disable = uct_base_iface_progress_disable, .iface_progress = (void*)uct_rc_mlx5_iface_progress, .iface_event_fd_get = uct_ib_iface_event_fd_get, .iface_event_arm = uct_rc_iface_event_arm, diff --git a/src/uct/ib/rc/verbs/rc_verbs.h b/src/uct/ib/rc/verbs/rc_verbs.h index e3e7067a526..750de26e8af 100644 --- a/src/uct/ib/rc/verbs/rc_verbs.h +++ b/src/uct/ib/rc/verbs/rc_verbs.h @@ -57,9 +57,6 @@ typedef struct uct_rc_verbs_iface { struct { unsigned tx_max_wr; } config; - - /* Progress function (either regular or TM aware) */ - ucs_callback_t progress; } uct_rc_verbs_iface_t; @@ -198,8 +195,6 @@ ucs_status_t uct_rc_verbs_ep_connect_to_ep(uct_ep_h tl_ep, ucs_status_t uct_rc_verbs_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr); -unsigned uct_rc_verbs_iface_progress(void *arg); - ucs_status_t uct_rc_verbs_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, uct_rc_fc_request_t *req); diff --git a/src/uct/ib/rc/verbs/rc_verbs_common.c b/src/uct/ib/rc/verbs/rc_verbs_common.c index badc2b9d974..106564a1e4e 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_common.c +++ b/src/uct/ib/rc/verbs/rc_verbs_common.c @@ -165,8 +165,15 @@ ucs_status_t uct_rc_verbs_iface_prepost_recvs_common(uct_rc_iface_t *iface) return UCS_OK; } -#if IBV_EXP_HW_TM +void uct_rc_verbs_iface_common_progress_enable(uct_rc_verbs_iface_common_t *iface, + uct_rc_iface_t *rc_iface, + unsigned flags) +{ + uct_base_iface_progress_enable_cb(&rc_iface->super.super, iface->progress, + flags); +} +#if IBV_EXP_HW_TM static void uct_rc_verbs_iface_release_desc(uct_recv_desc_t *self, void *desc) { diff --git a/src/uct/ib/rc/verbs/rc_verbs_common.h b/src/uct/ib/rc/verbs/rc_verbs_common.h index 8463e87adbf..9a6602f001e 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_common.h +++ b/src/uct/ib/rc/verbs/rc_verbs_common.h @@ -124,6 +124,10 @@ typedef struct uct_rc_verbs_iface_common { uct_rc_verbs_release_desc_t rndv_desc; } tm; #endif + + /* Progress function (either regular or TM aware) */ + ucs_callback_t progress; + /* TODO: make a separate datatype */ struct { size_t notag_hdr_size; @@ -172,6 +176,10 @@ void uct_rc_verbs_iface_common_tag_cleanup(uct_rc_verbs_iface_common_t *iface); ucs_status_t uct_rc_verbs_iface_prepost_recvs_common(uct_rc_iface_t *iface); +void uct_rc_verbs_iface_common_progress_enable(uct_rc_verbs_iface_common_t *iface, + uct_rc_iface_t *rc_iface, + unsigned flags); + void uct_rc_verbs_iface_common_query(uct_rc_verbs_iface_common_t *verbs_iface, uct_rc_iface_t *rc_iface, uct_iface_attr_t *iface_attr); diff --git a/src/uct/ib/rc/verbs/rc_verbs_ep.c b/src/uct/ib/rc/verbs/rc_verbs_ep.c index 49cba9e0270..eb02d40ec4a 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_ep.c +++ b/src/uct/ib/rc/verbs/rc_verbs_ep.c @@ -831,10 +831,6 @@ UCS_CLASS_INIT_FUNC(uct_rc_verbs_ep_t, uct_iface_h tl_iface) uct_rc_txqp_available_set(&self->super.txqp, iface->config.tx_max_wr); uct_rc_verbs_txcnt_init(&self->txcnt); - uct_worker_progress_add_safe(iface->super.super.super.worker, - iface->progress, iface, - &iface->super.super.super.prog); - return uct_rc_verbs_ep_tag_qp_create(iface, self); } @@ -842,8 +838,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_verbs_ep_t) { uct_rc_verbs_iface_t *iface = ucs_derived_of(self->super.super.super.iface, uct_rc_verbs_iface_t); - uct_worker_progress_remove(iface->super.super.super.worker, - &iface->super.super.super.prog); + /* NOTE: usually, ci == pi here, but if user calls * flush(UCT_FLUSH_FLAG_CANCEL) then ep_destroy without next progress, * TX-completion handler is not able to return CQ credits because diff --git a/src/uct/ib/rc/verbs/rc_verbs_iface.c b/src/uct/ib/rc/verbs/rc_verbs_iface.c index 2cfd15bc715..a77d9737f6a 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_iface.c +++ b/src/uct/ib/rc/verbs/rc_verbs_iface.c @@ -108,7 +108,7 @@ uct_rc_verbs_iface_poll_tx(uct_rc_verbs_iface_t *iface) return num_wcs; } -unsigned uct_rc_verbs_iface_progress(void *arg) +static unsigned uct_rc_verbs_iface_progress(void *arg) { uct_rc_verbs_iface_t *iface = arg; unsigned count; @@ -121,11 +121,10 @@ unsigned uct_rc_verbs_iface_progress(void *arg) return uct_rc_verbs_iface_poll_tx(iface); } -unsigned uct_rc_verbs_iface_do_progress(uct_iface_h tl_iface) +static unsigned uct_rc_verbs_iface_do_progress(uct_iface_h tl_iface) { uct_rc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_verbs_iface_t); - - return iface->progress(iface); + return iface->verbs_common.progress(iface); } #if IBV_EXP_HW_TM @@ -185,7 +184,7 @@ uct_rc_verbs_iface_tag_init(uct_rc_verbs_iface_t *iface, if (UCT_RC_VERBS_TM_ENABLED(&iface->verbs_common)) { struct ibv_exp_create_srq_attr srq_init_attr = {}; - iface->progress = uct_rc_verbs_iface_progress_tm; + iface->verbs_common.progress = uct_rc_verbs_iface_progress_tm; return uct_rc_verbs_iface_common_tag_init(&iface->verbs_common, &iface->super, @@ -195,7 +194,7 @@ uct_rc_verbs_iface_tag_init(uct_rc_verbs_iface_t *iface, sizeof(struct ibv_exp_tmh_rvh)); } #endif - iface->progress = uct_rc_verbs_iface_progress; + iface->verbs_common.progress = uct_rc_verbs_iface_progress; return UCS_OK; } @@ -261,6 +260,13 @@ static ucs_status_t uct_rc_verbs_iface_query(uct_iface_h tl_iface, uct_iface_att return UCS_OK; } +static void uct_rc_verbs_iface_progress_enable(uct_iface_h tl_iface, unsigned flags) +{ + uct_rc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_verbs_iface_t); + uct_rc_verbs_iface_common_progress_enable(&iface->verbs_common, &iface->super, + flags); +} + static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_iface_config_t *tl_config) @@ -334,6 +340,8 @@ static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t, uct_md_h md, uct_worker_h worke static UCS_CLASS_CLEANUP_FUNC(uct_rc_verbs_iface_t) { + uct_base_iface_progress_disable(&self->super.super.super.super, + UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); uct_rc_verbs_iface_common_tag_cleanup(&self->verbs_common); uct_rc_verbs_iface_common_cleanup(&self->verbs_common); } @@ -373,8 +381,8 @@ static uct_rc_iface_ops_t uct_rc_verbs_iface_ops = { .ep_connect_to_ep = uct_rc_verbs_ep_connect_to_ep, .iface_flush = uct_rc_iface_flush, .iface_fence = uct_base_iface_fence, - .iface_progress_enable = ucs_empty_function, - .iface_progress_disable = ucs_empty_function, + .iface_progress_enable = uct_rc_verbs_iface_progress_enable, + .iface_progress_disable = uct_base_iface_progress_disable, .iface_progress = uct_rc_verbs_iface_do_progress, #if IBV_EXP_HW_TM .iface_tag_recv_zcopy = uct_rc_verbs_iface_tag_recv_zcopy, diff --git a/src/uct/ib/ud/base/ud_iface.c b/src/uct/ib/ud/base/ud_iface.c index 6737e4a8bc9..bfb5bccecb5 100644 --- a/src/uct/ib/ud/base/ud_iface.c +++ b/src/uct/ib/ud/base/ud_iface.c @@ -367,8 +367,6 @@ ucs_status_t uct_ud_iface_complete_init(uct_ud_iface_t *iface) goto err_twheel_cleanup; } - uct_base_iface_progress_enable(&iface->super.super.super, - UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); return UCS_OK; err_twheel_cleanup: diff --git a/src/uct/sm/mm/mm_iface.c b/src/uct/sm/mm/mm_iface.c index 1a2b331747b..3b45622c94e 100644 --- a/src/uct/sm/mm/mm_iface.c +++ b/src/uct/sm/mm/mm_iface.c @@ -550,9 +550,6 @@ static UCS_CLASS_INIT_FUNC(uct_mm_iface_t, uct_md_h md, uct_worker_h worker, ucs_arbiter_init(&self->arbiter); - uct_base_iface_progress_enable(&self->super.super, - UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); - ucs_debug("Created an MM iface. FIFO mm id: %zu", self->fifo_mm_id); return UCS_OK; diff --git a/test/examples/uct_hello_world.c b/test/examples/uct_hello_world.c index 43381c5525e..8e1e8c0014b 100644 --- a/test/examples/uct_hello_world.c +++ b/test/examples/uct_hello_world.c @@ -231,6 +231,10 @@ static ucs_status_t init_iface(char *dev_name, char *tl_name, uct_config_release(config); CHKERR_JUMP(UCS_OK != status, "open temporary interface", error_ret); + /* Enable progress on the interface */ + uct_iface_progress_enable(iface_p->iface, + UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); + /* Get interface attributes */ status = uct_iface_query(iface_p->iface, &iface_p->attr); CHKERR_JUMP(UCS_OK != status, "query iface", error_iface); diff --git a/test/gtest/uct/uct_test.cc b/test/gtest/uct/uct_test.cc index 049224f3f05..745e3cd8e05 100644 --- a/test/gtest/uct/uct_test.cc +++ b/test/gtest/uct/uct_test.cc @@ -272,6 +272,8 @@ uct_test::entity::entity(const resource& resource, uct_iface_config_t *iface_con status = uct_iface_query(m_iface, &m_iface_attr); ASSERT_UCS_OK(status); + + uct_iface_progress_enable(m_iface, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); }