Skip to content

Commit

Permalink
Merge pull request #1891 from yosefe/topic/uct-iface-progress-disabled
Browse files Browse the repository at this point in the history
UCT/UCP: Disable interface progress by default.
  • Loading branch information
yosefe authored Oct 11, 2017
2 parents 18cee9d + b676392 commit 25615c7
Show file tree
Hide file tree
Showing 20 changed files with 132 additions and 72 deletions.
3 changes: 3 additions & 0 deletions src/tools/perf/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,9 @@ static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf, ucx_perf_params_t *
goto out_free_mem;
}

uct_iface_progress_enable(perf->uct.iface,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

return UCS_OK;

out_free_mem:
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ void ucp_context_tag_offload_enable(ucp_context_h context)

offload_iface = ucs_queue_head_elem_non_empty(&context->tm.offload.ifaces,
ucp_worker_iface_t, queue);
ucp_worker_iface_activate(offload_iface);
ucp_worker_iface_activate(offload_iface, 0);

ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu",
context->tm.offload.thresh, context->tm.offload.zcopy_thresh);
Expand Down
19 changes: 10 additions & 9 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
}
}

void ucp_worker_iface_activate(ucp_worker_iface_t *wiface)
void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags)
{
ucp_worker_h worker = wiface->worker;
ucs_status_t status;
Expand All @@ -408,7 +408,7 @@ void ucp_worker_iface_activate(ucp_worker_iface_t *wiface)
}

uct_iface_progress_enable(wiface->iface,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV | uct_flags);
}

static void ucp_worker_iface_deactivate(ucp_worker_iface_t *wiface, int force)
Expand Down Expand Up @@ -451,7 +451,12 @@ void ucp_worker_iface_progress_ep(ucp_worker_iface_t *wiface)
ucs_trace_func("iface=%p", wiface->iface);

UCS_ASYNC_BLOCK(&wiface->worker->async);
ucp_worker_iface_activate(wiface);

/* This function may be called from progress thread (such as when processing
* wireup messages), so ask UCT to be thread-safe.
*/
ucp_worker_iface_activate(wiface, UCT_PROGRESS_THREAD_SAFE);

UCS_ASYNC_UNBLOCK(&wiface->worker->async);
}

Expand Down Expand Up @@ -488,7 +493,7 @@ static ucs_status_t ucp_worker_iface_check_events_do(ucp_worker_iface_t *wiface,
*progress_count = uct_iface_progress(wiface->iface);
if (prev_am_count != wiface->proxy_am_count) {
/* Received relevant active messages, activate the interface */
ucp_worker_iface_activate(wiface);
ucp_worker_iface_activate(wiface, 0);
return UCS_OK;
} else if (*progress_count == 0) {
/* Arm the interface to wait for next event */
Expand Down Expand Up @@ -682,10 +687,6 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id,
tl_id, wiface->iface, UCT_TL_RESOURCE_DESC_ARG(&resource->tl_rsc),
worker);

/* Disable progress until we know better */
uct_iface_progress_disable(wiface->iface, UCT_PROGRESS_SEND |
UCT_PROGRESS_RECV);

VALGRIND_MAKE_MEM_UNDEFINED(&wiface->attr, sizeof(wiface->attr));
status = uct_iface_query(wiface->iface, &wiface->attr);
if (status != UCS_OK) {
Expand Down Expand Up @@ -725,7 +726,7 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id,
{
ucp_worker_iface_deactivate(wiface, 1);
} else {
ucp_worker_iface_activate(wiface);
ucp_worker_iface_activate(wiface, 0);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void ucp_worker_iface_unprogress_ep(ucp_worker_iface_t *wiface);

void ucp_worker_signal_internal(ucp_worker_h worker);

void ucp_worker_iface_activate(ucp_worker_iface_t *wiface);
void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags);

static inline const char* ucp_worker_get_name(ucp_worker_h worker)
{
Expand Down
21 changes: 16 additions & 5 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,11 @@ enum uct_flush_flags {
* @brief UCT progress types
*/
enum uct_progress_types {
UCT_PROGRESS_SEND = UCS_BIT(0), /**< Progress send operations */
UCT_PROGRESS_RECV = UCS_BIT(1) /**< Progress receive operations */
UCT_PROGRESS_SEND = UCS_BIT(0), /**< Progress send operations */
UCT_PROGRESS_RECV = UCS_BIT(1), /**< Progress receive operations */
UCT_PROGRESS_THREAD_SAFE = UCS_BIT(7) /**< Enable/disable progress while
another thread may be calling
@ref ucp_worker_progress(). */
};


Expand Down Expand Up @@ -2306,11 +2309,15 @@ UCT_INLINE_API ucs_status_t uct_iface_tag_recv_cancel(uct_iface_h iface,
* Notify the transport that it should actively progress communications during
* @ref uct_worker_progress().
*
* When the interface is created, its progress is enabled.
* When the interface is created, its progress is initially disabled.
*
* @param [in] iface The interface to enable progress.
* @param [in] flags The type of progress to enable as defined by
* @ref uct_progress_types.
* @ref uct_progress_types
*
* @note This function is not thread safe with respect to
* @ref ucp_worker_progress(), unless the flag
* @ref UCT_PROGRESS_THREAD_SAFE is specified.
*
*/
UCT_INLINE_API void uct_iface_progress_enable(uct_iface_h iface, unsigned flags)
Expand All @@ -2327,12 +2334,16 @@ UCT_INLINE_API void uct_iface_progress_enable(uct_iface_h iface, unsigned flags)
* @ref uct_worker_progress(). Thus the latency of other transports may be
* improved.
*
* By default, progress is enabled when the interface is created.
* By default, progress is disabled when the interface is created.
*
* @param [in] iface The interface to disable progress.
* @param [in] flags The type of progress to disable as defined by
* @ref uct_progress_types.
*
* @note This function is not thread safe with respect to
* @ref ucp_worker_progress(), unless the flag
* @ref UCT_PROGRESS_THREAD_SAFE is specified.
*
*/
UCT_INLINE_API void uct_iface_progress_disable(uct_iface_h iface, unsigned flags)
{
Expand Down
37 changes: 30 additions & 7 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,31 @@ void uct_iface_close(uct_iface_h iface)
void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags)
{
uct_base_iface_t *iface = ucs_derived_of(tl_iface, uct_base_iface_t);
uct_base_iface_progress_enable_cb(iface,
(ucs_callback_t)iface->super.ops.iface_progress,
flags);
}

void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface,
ucs_callback_t cb, unsigned flags)
{
uct_priv_worker_t *worker = iface->worker;
unsigned thread_safe;

UCS_ASYNC_BLOCK(worker->async);

thread_safe = flags & UCT_PROGRESS_THREAD_SAFE;
flags &= ~UCT_PROGRESS_THREAD_SAFE;

/* Add callback only if previous flags are 0 and new flags != 0 */
if (!iface->progress_flags && flags) {
if (iface->prog.id == UCS_CALLBACKQ_ID_NULL) {
iface->prog.id = ucs_callbackq_add(&worker->super.progress_q,
(ucs_callback_t)iface->super.ops.iface_progress,
if ((!iface->progress_flags && flags) &&
(iface->prog.id == UCS_CALLBACKQ_ID_NULL)) {
if (thread_safe) {
iface->prog.id = ucs_callbackq_add_safe(&worker->super.progress_q,
cb, iface,
UCS_CALLBACKQ_FLAG_FAST);
} else {
iface->prog.id = ucs_callbackq_add(&worker->super.progress_q, cb,
iface, UCS_CALLBACKQ_FLAG_FAST);
}
}
Expand All @@ -206,17 +222,24 @@ void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags)
{
uct_base_iface_t *iface = ucs_derived_of(tl_iface, uct_base_iface_t);
uct_priv_worker_t *worker = iface->worker;
unsigned thread_safe;

UCS_ASYNC_BLOCK(worker->async);

thread_safe = flags & UCT_PROGRESS_THREAD_SAFE;
flags &= ~UCT_PROGRESS_THREAD_SAFE;

/* Remove callback only if previous flags != 0, and removing the given
* flags makes it become 0.
*/
if (iface->progress_flags && !(iface->progress_flags & ~flags)) {
if (iface->prog.id != UCS_CALLBACKQ_ID_NULL) {
if ((iface->progress_flags && !(iface->progress_flags & ~flags)) &&
(iface->prog.id != UCS_CALLBACKQ_ID_NULL)) {
if (thread_safe) {
ucs_callbackq_remove_safe(&worker->super.progress_q, iface->prog.id);
} else {
ucs_callbackq_remove(&worker->super.progress_q, iface->prog.id);
iface->prog.id = UCS_CALLBACKQ_ID_NULL;
}
iface->prog.id = UCS_CALLBACKQ_ID_NULL;
}
iface->progress_flags &= ~flags;

Expand Down
3 changes: 3 additions & 0 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags);

void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags);

void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface,
ucs_callback_t cb, unsigned flags);

void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags);

ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
Expand Down
3 changes: 0 additions & 3 deletions src/uct/ib/dc/accel/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,6 @@ static UCS_CLASS_INIT_FUNC(uct_dc_mlx5_iface_t, uct_md_h md, uct_worker_h worker
UCT_IB_MLX5_AV_FULL_SIZE) /
sizeof(struct mlx5_wqe_data_seg));

/* TODO: only register progress when we have a connection */
uct_base_iface_progress_enable(&self->super.super.super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
ucs_debug("created dc iface %p", self);
return UCS_OK;

Expand Down
38 changes: 25 additions & 13 deletions src/uct/ib/dc/verbs/dc_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -767,9 +767,9 @@ uct_dc_verbs_poll_tx(uct_dc_verbs_iface_t *iface)
return num_wcs;
}

static unsigned uct_dc_verbs_iface_progress(uct_iface_h tl_iface)
static unsigned uct_dc_verbs_iface_progress(void *arg)
{
uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t);
uct_dc_verbs_iface_t *iface = arg;
unsigned count;

count = uct_rc_verbs_iface_poll_rx_common(&iface->super.super);
Expand Down Expand Up @@ -978,9 +978,9 @@ static ucs_status_t uct_dc_verbs_iface_tag_recv_cancel(uct_iface_h tl_iface,
ctx, force);
}

static unsigned uct_dc_verbs_iface_progress_tm(uct_iface_h tl_iface)
static unsigned uct_dc_verbs_iface_progress_tm(void *arg)
{
uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t);
uct_dc_verbs_iface_t *iface = arg;
unsigned count;

count = uct_rc_verbs_iface_poll_rx_tm(&iface->verbs_common,
Expand All @@ -997,9 +997,8 @@ static ucs_status_t
uct_dc_verbs_iface_tag_init(uct_dc_verbs_iface_t *iface,
uct_dc_verbs_iface_config_t *config)
{
uct_iface_t *tl_iface = &iface->super.super.super.super.super;

#if IBV_EXP_HW_TM_DC

if (UCT_RC_VERBS_TM_ENABLED(&iface->verbs_common)) {
struct ibv_exp_create_srq_attr srq_init_attr = {};
struct ibv_exp_srq_dc_offload_params dc_op = {};
Expand Down Expand Up @@ -1032,13 +1031,13 @@ uct_dc_verbs_iface_tag_init(uct_dc_verbs_iface_t *iface,
return status;
}


tl_iface->ops.iface_progress = uct_dc_verbs_iface_progress_tm;
}
iface->verbs_common.progress = uct_dc_verbs_iface_progress_tm;
} else
#endif
{
iface->verbs_common.progress = uct_dc_verbs_iface_progress;
}

uct_base_iface_progress_enable(tl_iface,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
return UCS_OK;
}

Expand Down Expand Up @@ -1096,6 +1095,19 @@ uct_dc_verbs_iface_event_arm(uct_iface_h tl_iface, unsigned events)
UCT_RC_VERBS_TM_ENABLED(&iface->verbs_common));
}

static void uct_dc_verbs_iface_progress_enable(uct_iface_h tl_iface, unsigned flags)
{
uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t);
uct_rc_verbs_iface_common_progress_enable(&iface->verbs_common,
&iface->super.super, flags);
}

static unsigned uct_dc_verbs_iface_do_progress(uct_iface_h tl_iface)
{
uct_dc_verbs_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_verbs_iface_t);
return iface->verbs_common.progress(iface);
}

static void UCS_CLASS_DELETE_FUNC_NAME(uct_dc_verbs_iface_t)(uct_iface_t*);

static uct_dc_iface_ops_t uct_dc_verbs_iface_ops = {
Expand Down Expand Up @@ -1136,9 +1148,9 @@ static uct_dc_iface_ops_t uct_dc_verbs_iface_ops = {
#endif
.iface_flush = uct_dc_iface_flush,
.iface_fence = uct_base_iface_fence,
.iface_progress_enable = uct_base_iface_progress_enable,
.iface_progress_enable = uct_dc_verbs_iface_progress_enable,
.iface_progress_disable = uct_base_iface_progress_disable,
.iface_progress = uct_dc_verbs_iface_progress,
.iface_progress = uct_dc_verbs_iface_do_progress,
.iface_event_fd_get = uct_ib_iface_event_fd_get,
.iface_event_arm = uct_dc_verbs_iface_event_arm,
.iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_dc_verbs_iface_t),
Expand Down
6 changes: 0 additions & 6 deletions src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,6 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, uct_iface_h tl_iface)
self->qp_num = self->super.txqp.qp->qp_num;
self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max);
uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max);

uct_worker_progress_add_safe(iface->super.super.super.worker,
uct_rc_mlx5_iface_progress, iface,
&iface->super.super.super.prog);
return UCS_OK;
}

Expand All @@ -455,8 +451,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t)
uct_rc_mlx5_iface_t *iface = ucs_derived_of(self->super.super.super.iface,
uct_rc_mlx5_iface_t);

uct_worker_progress_remove(iface->super.super.super.worker,
&iface->super.super.super.prog);
uct_ib_mlx5_txwq_cleanup(&self->tx.wq);

/* Modify QP to error to make HW generate CQEs for all in-progress SRQ
Expand Down
6 changes: 4 additions & 2 deletions src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ static UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_t, uct_md_h md, uct_worker_h worker

static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_iface_t)
{
uct_base_iface_progress_disable(&self->super.super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common);
}

Expand Down Expand Up @@ -213,8 +215,8 @@ static uct_rc_iface_ops_t uct_rc_mlx5_iface_ops = {
.ep_connect_to_ep = uct_rc_ep_connect_to_ep,
.iface_flush = uct_rc_iface_flush,
.iface_fence = uct_base_iface_fence,
.iface_progress_enable = ucs_empty_function,
.iface_progress_disable = ucs_empty_function,
.iface_progress_enable = uct_base_iface_progress_enable,
.iface_progress_disable = uct_base_iface_progress_disable,
.iface_progress = (void*)uct_rc_mlx5_iface_progress,
.iface_event_fd_get = uct_ib_iface_event_fd_get,
.iface_event_arm = uct_rc_iface_event_arm,
Expand Down
5 changes: 0 additions & 5 deletions src/uct/ib/rc/verbs/rc_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ typedef struct uct_rc_verbs_iface {
struct {
unsigned tx_max_wr;
} config;

/* Progress function (either regular or TM aware) */
ucs_callback_t progress;
} uct_rc_verbs_iface_t;


Expand Down Expand Up @@ -198,8 +195,6 @@ ucs_status_t uct_rc_verbs_ep_connect_to_ep(uct_ep_h tl_ep,

ucs_status_t uct_rc_verbs_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr);

unsigned uct_rc_verbs_iface_progress(void *arg);

ucs_status_t uct_rc_verbs_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
uct_rc_fc_request_t *req);

Expand Down
9 changes: 8 additions & 1 deletion src/uct/ib/rc/verbs/rc_verbs_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,15 @@ ucs_status_t uct_rc_verbs_iface_prepost_recvs_common(uct_rc_iface_t *iface)
return UCS_OK;
}

#if IBV_EXP_HW_TM
void uct_rc_verbs_iface_common_progress_enable(uct_rc_verbs_iface_common_t *iface,
uct_rc_iface_t *rc_iface,
unsigned flags)
{
uct_base_iface_progress_enable_cb(&rc_iface->super.super, iface->progress,
flags);
}

#if IBV_EXP_HW_TM

static void uct_rc_verbs_iface_release_desc(uct_recv_desc_t *self, void *desc)
{
Expand Down
Loading

0 comments on commit 25615c7

Please sign in to comment.