Skip to content

Commit

Permalink
Merge pull request #9507 from iyastreb/ucp/rndv/fine-grained-config
Browse files Browse the repository at this point in the history
UCP: Fine-grained intra/inter config for rendezvous
  • Loading branch information
yosefe authored Jan 22, 2024
2 parents 0383cdb + 9ab2149 commit 9d18b6b
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 24 deletions.
10 changes: 8 additions & 2 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,14 @@ static ucs_config_field_t ucp_context_config_table[] = {
ucs_offsetof(ucp_context_config_t, bcopy_thresh), UCS_CONFIG_TYPE_MEMUNITS},

{"RNDV_THRESH", UCS_VALUE_AUTO_STR,
"Threshold for switching from eager to rendezvous protocol",
ucs_offsetof(ucp_context_config_t, rndv_thresh), UCS_CONFIG_TYPE_MEMUNITS},
"Threshold for switching from eager to rendezvous protocol", 0,
UCS_CONFIG_TYPE_KEY_VALUE(UCS_CONFIG_TYPE_MEMUNITS,
{"intra", "threshold for intra-node communication",
ucs_offsetof(ucp_context_config_t, rndv_intra_thresh)},
{"inter", "threshold for inter-node communication",
ucs_offsetof(ucp_context_config_t, rndv_inter_thresh)},
{NULL}
)},

{"RNDV_SEND_NBR_THRESH", "256k",
"Threshold for switching from eager to rendezvous protocol in ucp_tag_send_nbr().\n"
Expand Down
6 changes: 4 additions & 2 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ enum {
typedef struct ucp_context_config {
/** Threshold for switching UCP to buffered copy(bcopy) protocol */
size_t bcopy_thresh;
/** Threshold for switching UCP to rendezvous protocol */
size_t rndv_thresh;
/** Threshold for switching UCP to rendezvous protocol for intra-node */
size_t rndv_intra_thresh;
/** Threshold for switching UCP to rendezvous protocol for inter-node */
size_t rndv_inter_thresh;
/** Threshold for switching UCP to rendezvous protocol
* in ucp_tag_send_nbr() */
size_t rndv_send_nbr_thresh;
Expand Down
16 changes: 8 additions & 8 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2183,7 +2183,7 @@ ucp_ep_config_set_am_rndv_thresh(ucp_worker_h worker,
ucs_assert(config->key.am_lane != UCP_NULL_LANE);
ucs_assert(config->key.lanes[config->key.am_lane].rsc_index != UCP_NULL_RESOURCE);

if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
if (context->config.ext.rndv_inter_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the AM rndv threshold on its own.*/
status = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
Expand All @@ -2196,8 +2196,8 @@ ucp_ep_config_set_am_rndv_thresh(ucp_worker_h worker,
rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh;
ucs_trace("active message rendezvous threshold is %zu", rndv_thresh);
} else {
rndv_thresh = context->config.ext.rndv_thresh;
rndv_local_thresh = context->config.ext.rndv_thresh;
rndv_thresh = context->config.ext.rndv_inter_thresh;
rndv_local_thresh = context->config.ext.rndv_inter_thresh;
}

min_thresh = ucs_max(iface_attr->cap.am.min_zcopy, min_rndv_thresh);
Expand Down Expand Up @@ -2233,7 +2233,7 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config,

iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);

if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
if (context->config.ext.rndv_inter_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/
status = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
Expand All @@ -2244,8 +2244,8 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config,

rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh;
} else {
rndv_thresh = context->config.ext.rndv_thresh;
rndv_local_thresh = context->config.ext.rndv_thresh;
rndv_thresh = context->config.ext.rndv_inter_thresh;
rndv_local_thresh = context->config.ext.rndv_inter_thresh;
}

min_thresh = ucs_max(iface_attr->cap.get.min_zcopy, min_rndv_thresh);
Expand Down Expand Up @@ -2371,8 +2371,8 @@ ucp_ep_config_max_short(ucp_context_t *context, uct_iface_attr_t *iface_attr,
}

if ((rndv_thresh != NULL) &&
(context->config.ext.rndv_thresh != UCS_MEMUNITS_AUTO)) {
/* Adjust max_short if rndv_thresh is set externally. Note local and
(context->config.ext.rndv_inter_thresh != UCS_MEMUNITS_AUTO)) {
/* Adjust max_short if rndv_inter_thresh is set externally. Note local and
* remote threshold values are the same if set externally, so can
* compare with just one of them. */
ucs_assert(rndv_thresh->remote == rndv_thresh->local);
Expand Down
7 changes: 7 additions & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ ucp_ep_config_key_has_cm_lane(const ucp_ep_config_key_t *config_key)
return config_key->cm_lane != UCP_NULL_LANE;
}

static UCS_F_ALWAYS_INLINE int
ucp_ep_config_is_inter_node(const ucp_ep_config_key_t *config_key)
{
return !(config_key->flags & (UCP_EP_CONFIG_KEY_FLAG_SELF |
UCP_EP_CONFIG_KEY_FLAG_INTRA_NODE));
}

static inline int ucp_ep_has_cm_lane(ucp_ep_h ep)
{
return (ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
Expand Down
8 changes: 7 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ static ucs_status_t ucp_worker_wakeup_init(ucp_worker_h worker,
*/
if ((events & UCP_WAKEUP_TAG_SEND) ||
((events & UCP_WAKEUP_TAG_RECV) &&
(context->config.ext.rndv_thresh != UCS_MEMUNITS_INF)))
((context->config.ext.rndv_intra_thresh != UCS_MEMUNITS_INF) ||
(context->config.ext.rndv_inter_thresh != UCS_MEMUNITS_INF))))
{
worker->uct_events |= UCT_EVENT_SEND_COMP;
}
Expand Down Expand Up @@ -2159,6 +2160,11 @@ ucs_status_t ucp_worker_get_ep_config(ucp_worker_h worker,
UCP_FEATURE_AM, UCP_OP_ID_AM_SEND,
UCP_PROTO_FLAG_AM_SHORT, key->am_lane,
&ep_config->am_u.max_eager_short);

ucp_worker_ep_config_short_init(worker, ep_config, ep_cfg_index,
UCP_FEATURE_AM, UCP_OP_ID_AM_SEND_REPLY,
UCP_PROTO_FLAG_AM_SHORT, key->am_lane,
&ep_config->am_u.max_reply_eager_short);
}

ucp_worker_print_used_tls(worker, ep_cfg_index);
Expand Down
18 changes: 14 additions & 4 deletions src/ucp/rndv/proto_rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,19 +387,29 @@ ucp_proto_rndv_ctrl_init(const ucp_proto_rndv_ctrl_init_params_t *params,
return status;
}

static size_t ucp_proto_rndv_thresh(const ucp_proto_init_params_t *init_params)
size_t ucp_proto_rndv_thresh(const ucp_proto_init_params_t *init_params)
{
const ucp_proto_select_param_t *select_param = init_params->select_param;
const ucp_context_config_t *cfg = &init_params->worker->context->config.ext;
size_t rndv_thresh;

if ((cfg->rndv_thresh == UCS_MEMUNITS_AUTO) &&
/* Explicit configuration for rendezvous threshold (either fine-grained or
* coarse-grained) has precedence over rndv_send_nbr_thresh
*/
if (ucp_ep_config_is_inter_node(init_params->ep_config_key)) {
rndv_thresh = cfg->rndv_inter_thresh;
} else {
rndv_thresh = cfg->rndv_intra_thresh;
}

if ((rndv_thresh == UCS_MEMUNITS_AUTO) &&
(ucp_proto_select_op_attr_unpack(select_param->op_attr) &
UCP_OP_ATTR_FLAG_FAST_CMPL) &&
ucs_likely(UCP_MEM_IS_HOST(select_param->mem_type))) {
return cfg->rndv_send_nbr_thresh;
rndv_thresh = cfg->rndv_send_nbr_thresh;
}

return cfg->rndv_thresh;
return rndv_thresh;
}

ucs_status_t
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/rndv/proto_rndv.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ typedef struct {

} ucp_proto_rndv_ctrl_init_params_t;

/* Return rendezvous threshold for the provided configuration */
size_t ucp_proto_rndv_thresh(const ucp_proto_init_params_t *init_params);

/* Initializes protocol which sends rendezvous control message using AM lane
* (e.g. RTS and ATS). */
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/tag/offload/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ucp_tag_rndv_offload_proto_init(const ucp_proto_init_params_t *init_params)
.super.super = *init_params,
.super.latency = 0,
.super.overhead = 40e-9,
.super.cfg_thresh = context->config.ext.rndv_thresh,
.super.cfg_thresh = ucp_proto_rndv_thresh(init_params),
.super.cfg_priority = 60,
.super.min_length = ucp_ep_tag_offload_min_rndv_thresh(
context, init_params->ep_config_key),
Expand Down Expand Up @@ -158,7 +158,7 @@ ucp_tag_rndv_offload_sw_proto_init(const ucp_proto_init_params_t *init_params)
.super.super = *init_params,
.super.latency = 0,
.super.overhead = 40e-9,
.super.cfg_thresh = context->config.ext.rndv_thresh,
.super.cfg_thresh = ucp_proto_rndv_thresh(init_params),
.super.cfg_priority = 60,
.super.min_length = ucp_ep_tag_offload_min_rndv_thresh(
context, init_params->ep_config_key),
Expand Down
183 changes: 178 additions & 5 deletions test/gtest/ucp/test_ucp_tag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern "C" {
#include <ucs/arch/atomic.h>
#include <ucs/memory/rcache.h>
#include <ucs/memory/rcache_int.h>
#include <ucp/proto/proto_select.inl>
}

#include <sys/mman.h>
Expand Down Expand Up @@ -423,22 +424,134 @@ class test_ucp_tag_limits : public test_ucp_tag {
public:
test_ucp_tag_limits() {
m_test_offload = get_variant_value();
m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE",
ucs::to_string(m_test_offload).c_str()));
if (m_test_offload) {
m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
}
m_min_rndv = 0;
}

void init() {
/* TODO: Currently all the tests are for intra-node communication only.
* Find a way to create inter-node endpoint on a single node */
test_ucp_tag::init();

check_offload_support(m_test_offload);

if (m_test_offload) {
ucp_ep_config_t *cfg = ucp_ep_config(sender().ep());
m_min_rndv = ucp_ep_tag_offload_min_rndv_thresh(sender().ucph(),
&cfg->key);
}
}

static void get_test_variants(std::vector<ucp_test_variant>& variants) {
add_variant_with_value(variants, get_ctx_params(), 0, "");
add_variant_with_value(variants, get_ctx_params(), 1, "offload");
ucp_params_t params = get_ctx_params();
params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM;

add_variant_with_value(variants, params, 0, "");
add_variant_with_value(variants, params, 1, "offload");
}

protected:
bool m_test_offload;
bool m_test_offload;
size_t m_min_rndv;

static void check_short_thresh(const ucp_memtype_thresh_t &thresh,
size_t cfg_thresh, bool strict = false)
{
if (strict) {
EXPECT_EQ(thresh.memtype_on + 1, cfg_thresh);
EXPECT_EQ(thresh.memtype_off + 1, cfg_thresh);
} else {
EXPECT_LE(thresh.memtype_on + 1, cfg_thresh);
EXPECT_LE(thresh.memtype_off + 1, cfg_thresh);
}
}

void check_rndv_startup_config(size_t exp_rndv_intra_thresh,
size_t exp_rndv_inter_thresh)
{
ucp_context_config_t *cfg = &sender().worker()->context->config.ext;

EXPECT_EQ(exp_rndv_intra_thresh, cfg->rndv_intra_thresh);
EXPECT_EQ(exp_rndv_inter_thresh, cfg->rndv_inter_thresh);
}

void check_tag_rndv_v2(size_t cfg_thresh)
{
ucp_ep_config_t *cfg = ucp_ep_config(sender().ep());

if (m_test_offload) {
/* If configured threshold is less than min_rndv, then expect exact
* min_rndv limit for short messages */
if (cfg_thresh < m_min_rndv) {
check_short_thresh(cfg->tag.offload.max_eager_short, m_min_rndv,
true);
} else {
check_short_thresh(cfg->tag.offload.max_eager_short, cfg_thresh);
}
} else {
check_short_thresh(cfg->tag.max_eager_short, cfg_thresh);
}
}

void check_am_rndv_v2(size_t cfg_thresh)
{
ucp_ep_config_t *cfg = ucp_ep_config(sender().ep());

check_short_thresh(cfg->am_u.max_eager_short, cfg_thresh);
check_short_thresh(cfg->am_u.max_reply_eager_short, cfg_thresh);
}

void check_ep_proto_rndv_v2(size_t cfg_thresh, bool expect_rndv)
{
ucp_ep_config_t *cfg = ucp_ep_config(sender().ep());
ucp_proto_select_t *proto_select = &cfg->proto_select;
size_t msg_length = cfg_thresh;
ucp_proto_select_elem_t value;

if (m_test_offload) {
/* There is a lower bound for rndv threshold with tag offload.
* We should not send messages smaller than this limit */
msg_length = ucs_max(m_min_rndv, msg_length);
}

kh_foreach_value(proto_select->hash, value, {
/* Find index of the corresponding ucp_proto_threshold_elem_t
* to handle the given message size */
unsigned idx = 0;
const ucp_proto_config_t *proto_config;
for (; msg_length > value.thresholds[idx].max_msg_length; ++idx) {
proto_config = &value.thresholds[idx].proto_config;
/* Assert no rndv before expected limit */
EXPECT_EQ(nullptr, strstr(proto_config->proto->name, "rndv"));
}
proto_config = &value.thresholds[idx].proto_config;

if (expect_rndv) {
EXPECT_EQ(proto_config->cfg_thresh, cfg_thresh);
EXPECT_NE(nullptr, strstr(proto_config->proto->name, "rndv"));
} else if (!m_test_offload) {
/* Skip check that rndv is disabled for tag offload use case.
* With tag offload rndv protocol might still be used for large
* messages, because of HWTM limitation for eager transfers. */
EXPECT_NE(proto_config->cfg_thresh, cfg_thresh);
EXPECT_EQ(nullptr, strstr(proto_config->proto->name, "rndv"));
}
});
}

void check_rndv_threshold(size_t cfg_thresh)
{
ucp_context_config_t *cfg = &sender().worker()->context->config.ext;
if (cfg->proto_enable) {
/* Check proto_v2 rndv thresholds only when this protocol is
* enabled, otherwise these checks are irrelevant */
check_tag_rndv_v2(cfg_thresh);
check_am_rndv_v2(cfg_thresh);
check_ep_proto_rndv_v2(cfg_thresh, true);
}
}
};

UCS_TEST_P(test_ucp_tag_limits, check_max_short_rndv_thresh_zero, "RNDV_THRESH=0") {
Expand Down Expand Up @@ -482,6 +595,66 @@ UCS_TEST_P(test_ucp_tag_limits, check_max_short_zcopy_thresh_zero, "ZCOPY_THRESH
ucp_ep_config(sender().ep())->tag.eager.zcopy_thresh[0]);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_thresh,
"RNDV_THRESH=0")
{
check_rndv_startup_config(0, 0);
check_rndv_threshold(0);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_intra_thresh,
"RNDV_THRESH=auto,intra:20")
{
check_rndv_startup_config(20, UCS_MEMUNITS_AUTO);
check_rndv_threshold(20);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_intra_thresh_large,
"RNDV_THRESH=auto,intra:2000")
{
check_rndv_startup_config(2000, UCS_MEMUNITS_AUTO);
check_rndv_threshold(2000);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_intra_thresh_inf,
"RNDV_THRESH=auto,intra:inf")
{
check_rndv_startup_config(UCS_MEMUNITS_INF, UCS_MEMUNITS_AUTO);
/* check that rndv protocol is disabled */
check_ep_proto_rndv_v2(UCS_MEMUNITS_INF, false);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_intra_thresh_common,
"RNDV_THRESH=10,intra:20")
{
check_rndv_startup_config(20, 10);
check_rndv_threshold(20);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_intra_inter_thresh,
"RNDV_THRESH=intra:20,inter:30")
{
check_rndv_startup_config(20, 30);
check_rndv_threshold(20);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_inter_thresh,
"RNDV_THRESH=auto,inter:30")
{
check_rndv_startup_config(UCS_MEMUNITS_AUTO, 30);
/* TODO: configure/mock inter-node in test */

/* check that inter-node config is ignored for intra-node */
check_ep_proto_rndv_v2(30, false);
}

UCS_TEST_P(test_ucp_tag_limits, check_rndv_inter_thresh_common,
"RNDV_THRESH=1000,inter:30")
{
check_rndv_startup_config(1000, 30);
check_rndv_threshold(1000);
}

UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_limits)


Expand Down

0 comments on commit 9d18b6b

Please sign in to comment.