Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set UCT parameters from UCP API #209

Merged
merged 1 commit into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
* This routine checks API version compatibility, then discovers the available
* network interfaces, and initializes the network resources required for
* discovering of the network and memory related devices.
* This routine is responsible for initialization all information required for
* This routine is responsible for initialization all information required for
* a particular application scope, for example, MPI application, OpenSHMEM
* application, etc.
*
Expand Down
129 changes: 125 additions & 4 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ const char *ucp_feature_str[] = {
ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
ucp_config_t **config_p)
{
unsigned full_prefix_len = sizeof(UCS_DEFAULT_ENV_PREFIX) + 1;
unsigned full_prefix_len = sizeof(UCS_DEFAULT_ENV_PREFIX);
unsigned env_prefix_len = 0;
ucp_config_t *config;
ucs_status_t status;
Expand All @@ -339,7 +339,8 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,

if (env_prefix != NULL) {
env_prefix_len = strlen(env_prefix);
full_prefix_len += env_prefix_len;
/* Extra one byte for underscore _ character */
full_prefix_len += env_prefix_len + 1;
}

config->env_prefix = ucs_malloc(full_prefix_len, "ucp config");
Expand All @@ -362,6 +363,8 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
goto err_free_prefix;
}

ucs_list_head_init(&config->cached_key_list);

*config_p = config;
return UCS_OK;

Expand All @@ -373,24 +376,126 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
return status;
}

static void ucp_cached_key_release(ucs_config_cached_key_t *key_val)
{
ucs_assert(key_val != NULL);

ucs_free(key_val->key);
ucs_free(key_val->value);
ucs_free(key_val);
}

static void ucp_cached_key_list_release(ucs_list_link_t *list)
{
ucs_config_cached_key_t *key_val;

while (!ucs_list_is_empty(list)) {
key_val = ucs_list_extract_head(list, typeof(*key_val), list);
ucp_cached_key_release(key_val);
}
}

static ucs_status_t
ucp_config_cached_key_add(ucs_list_link_t *list,
const char *key, const char *value)
{
ucs_config_cached_key_t *cached_key;

cached_key = ucs_malloc(sizeof(*cached_key), "cached config key/value");
if (cached_key == NULL) {
goto err;
}

cached_key->key = ucs_strdup(key, "cached config key");
cached_key->value = ucs_strdup(value, "cached config value");
cached_key->used = 0;
if ((cached_key->key == NULL) || (cached_key->value == NULL)) {
goto err_free_key;
}

ucs_list_add_tail(list, &cached_key->list);
return UCS_OK;

err_free_key:
ucp_cached_key_release(cached_key);
err:
return UCS_ERR_NO_MEMORY;
}

void ucp_config_release(ucp_config_t *config)
{
ucp_cached_key_list_release(&config->cached_key_list);
ucs_config_parser_release_opts(config, ucp_config_table);
ucs_free(config->env_prefix);
ucs_free(config);
}

ucs_status_t ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value)
{
return ucs_config_parser_set_value(config, ucp_config_table, name, value);
}

ucs_status_t ucp_config_modify(ucp_config_t *config, const char *name,
const char *value)
{
return ucs_config_parser_set_value(config, ucp_config_table, name, value);
ucs_status_t status;

status = ucp_config_modify_internal(config, name, value);
if (status != UCS_ERR_NO_ELEM) {
return status;
}

return ucp_config_cached_key_add(&config->cached_key_list, name, value);
}

static
void ucp_config_print_cached_uct(const ucp_config_t *config, FILE *stream,
const char *title,
ucs_config_print_flags_t flags)
{
ucs_config_cached_key_t *key_val;

if (flags & UCS_CONFIG_PRINT_HEADER) {
fprintf(stream, "\n");
fprintf(stream, "#\n");
fprintf(stream, "# Cached UCT %s\n", title);
fprintf(stream, "#\n");
fprintf(stream, "\n");
}

if (flags & UCS_CONFIG_PRINT_CONFIG) {
ucs_list_for_each(key_val, &config->cached_key_list, list) {
fprintf(stream, "%s=%s\n", key_val->key, key_val->value);
}
}

if (flags & UCS_CONFIG_PRINT_HEADER) {
fprintf(stream, "\n");
}
}

void ucp_config_print(const ucp_config_t *config, FILE *stream,
const char *title, ucs_config_print_flags_t print_flags)
{
ucs_config_parser_print_opts(stream, title, config, ucp_config_table,
NULL, UCS_DEFAULT_ENV_PREFIX, print_flags);
ucp_config_print_cached_uct(config, stream, title, print_flags);
}

void ucp_apply_uct_config_list(ucp_context_h context, void *config)
{
ucs_config_cached_key_t *key_val;
ucs_status_t status;

ucs_list_for_each(key_val, &context->cached_key_list, list) {
status = uct_config_modify(config, key_val->key, key_val->value);
if (status == UCS_OK) {
ucs_debug("apply uct configuration %s=%s",
key_val->key, key_val->value);
key_val->used = 1;
}
}
}

/* Search str in the array. If str_suffix is specified, search for
Expand Down Expand Up @@ -846,6 +951,8 @@ static ucs_status_t ucp_fill_tl_md(ucp_context_h context,
return status;
}

ucp_apply_uct_config_list(context, md_config);

status = uct_md_open(context->tl_cmpts[cmpt_index].cmpt, md_rsc->md_name,
md_config, &tl_md->md);
uct_config_release(md_config);
Expand Down Expand Up @@ -1373,6 +1480,7 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
{
unsigned i, num_alloc_methods, method;
const char *method_name;
ucs_config_cached_key_t *key_val;
ucs_status_t status;

ucp_apply_params(context, params,
Expand Down Expand Up @@ -1403,7 +1511,7 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
context->config.ext.bcopy_bw);

/* always init MT lock in context even though it is disabled by user,
* because we need to use context lock to protect ucp_mm_ and ucp_rkey_
* because we need to use context lock to protect ucp_mem_ and ucp_rkey_
* routines */
UCP_THREAD_LOCK_INIT(&context->mt_lock);

Expand Down Expand Up @@ -1486,8 +1594,18 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
}
}

ucs_list_for_each(key_val, &config->cached_key_list, list) {
status = ucp_config_cached_key_add(&context->cached_key_list,
key_val->key, key_val->value);
if (status != UCS_OK) {
goto err_free_key_list;
}
}

return UCS_OK;

err_free_key_list:
ucp_cached_key_list_release(&context->cached_key_list);
err_free_alloc_methods:
ucs_free(context->config.alloc_methods);
err_free_env_prefix:
Expand All @@ -1501,6 +1619,7 @@ static void ucp_free_config(ucp_context_h context)
{
ucs_free(context->config.alloc_methods);
ucs_free(context->config.env_prefix);
ucp_cached_key_list_release(&context->cached_key_list);
}

ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_version,
Expand Down Expand Up @@ -1539,6 +1658,8 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
goto err_release_config;
}

ucs_list_head_init(&context->cached_key_list);

status = ucp_fill_config(context, params, config);
if (status != UCS_OK) {
goto err_free_ctx;
Expand Down
15 changes: 12 additions & 3 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ typedef struct ucp_context_config {

struct ucp_config {
/** Array of device lists names to use.
* This array holds three lists - network devices, shared memory devices
* and acceleration devices */
* This array holds four lists - network devices, shared memory devices,
* acceleration devices and loop-back devices */
ucs_config_names_array_t devices[UCT_DEVICE_TYPE_LAST];
/** Array of transport names to use */
ucs_config_names_array_t tls;
Expand All @@ -123,6 +123,8 @@ struct ucp_config {
char *env_prefix;
/** Configuration saved directly in the context */
ucp_context_config_t ctx;
/** Save ucx configurations not listed in ucp_config_table **/
ucs_list_link_t cached_key_list;
};


Expand Down Expand Up @@ -172,7 +174,6 @@ typedef struct ucp_tl_md {
* UCP context
*/
typedef struct ucp_context {

ucp_tl_cmpt_t *tl_cmpts; /* UCT components */
ucp_rsc_index_t num_cmpts; /* Number of UCT components */

Expand Down Expand Up @@ -247,6 +248,8 @@ typedef struct ucp_context {
/* All configurations about multithreading support */
ucp_mt_lock_t mt_lock;

/* Save cached uct configurations */
ucs_list_link_t cached_key_list;
} ucp_context_t;


Expand Down Expand Up @@ -466,4 +469,10 @@ uint64_t ucp_context_dev_tl_bitmap(ucp_context_h context, const char *dev_name);
uint64_t ucp_context_dev_idx_tl_bitmap(ucp_context_h context,
ucp_rsc_index_t dev_idx);

ucs_status_t
ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value);

void ucp_apply_uct_config_list(ucp_context_h context, void *config);

#endif
33 changes: 33 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,8 @@ ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id,
goto err_free_iface;
}

ucp_apply_uct_config_list(context, iface_config);

UCS_STATIC_ASSERT(UCP_WORKER_HEADROOM_PRIV_SIZE >= sizeof(ucp_eager_sync_hdr_t));

/* Fill rest of uct_iface params (caller should fill specific mode fields) */
Expand Down Expand Up @@ -1290,6 +1292,8 @@ static ucs_status_t ucp_worker_add_resource_cms(ucp_worker_h worker)
goto err_free_cms;
}

ucp_apply_uct_config_list(context, cm_config);

status = uct_cm_open(cmpt, worker->uct, cm_config, &worker->cms[i].cm);
if (status != UCS_OK) {
ucs_error("failed to open CM on component %s with status %s",
Expand Down Expand Up @@ -1660,6 +1664,32 @@ static ucs_mpool_ops_t ucp_rkey_mpool_ops = {
.obj_str = NULL
};

static void ucp_warn_unused_uct_config(ucp_context_h context)
{
unsigned num_unused_cached_kv = 0;
ucs_string_buffer_t unused_cached_uct_cfg;
ucs_config_cached_key_t *key_val;

ucs_string_buffer_init(&unused_cached_uct_cfg);

ucs_list_for_each(key_val, &context->cached_key_list, list) {
if (!key_val->used) {
ucs_string_buffer_appendf(&unused_cached_uct_cfg, "%s,",
key_val->key);
++num_unused_cached_kv;
}
}

if (num_unused_cached_kv > 0) {
ucs_string_buffer_rtrim(&unused_cached_uct_cfg , ",");
ucs_warn("unused cached uct configuration%s: %s",
(num_unused_cached_kv > 1) ? "s" : "",
ucs_string_buffer_cstr(&unused_cached_uct_cfg));
}

ucs_string_buffer_cleanup(&unused_cached_uct_cfg);
}

ucs_status_t ucp_worker_create(ucp_context_h context,
const ucp_worker_params_t *params,
ucp_worker_h *worker_p)
Expand Down Expand Up @@ -1839,6 +1869,9 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
*/
ucs_config_parser_print_env_vars_once(context->config.env_prefix);

/* Warn unused cached uct configuration */
ucp_warn_unused_uct_config(context);

*worker_p = worker;
return UCS_OK;

Expand Down
9 changes: 9 additions & 0 deletions src/ucs/config/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ typedef struct ucs_config_field {
} ucs_config_field_t;


typedef struct ucs_config_cached_key {
char *key; /* Cached configuration key */
char *value; /* Cached configuration value */
int used; /* Whether this configuration was
* applied successfully */
ucs_list_link_t list; /* Element in a list of key/value entries */
} ucs_config_cached_key_t;


typedef struct ucs_ib_port_spec {
char *device_name;
unsigned port_num;
Expand Down
1 change: 0 additions & 1 deletion src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -2252,7 +2252,6 @@ ucs_status_t uct_md_config_read(uct_component_h component,
uct_md_config_t **config_p);



/**
* @ingroup UCT_MD
* @brief Check if remote sock address is accessible from the memory domain.
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void ucp_test::modify_config(const std::string& name, const std::string& value,
{
ucs_status_t status;

status = ucp_config_modify(m_ucp_config, name.c_str(), value.c_str());
status = ucp_config_modify_internal(m_ucp_config, name.c_str(), value.c_str());
if (status == UCS_ERR_NO_ELEM) {
test_base::modify_config(name, value, optional);
} else if (status != UCS_OK) {
Expand Down