Skip to content

Commit

Permalink
UCP: Set UCT parameters from UCP API
Browse files Browse the repository at this point in the history
Backport below PR from master to integration3 branch
openucx#7019

Signed-off-by: Changcheng Liu <jerrliu@nvidia.com>
  • Loading branch information
changchengx committed Sep 26, 2021
1 parent 6d7b1dc commit 95bc48d
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
* This routine checks API version compatibility, then discovers the available
* network interfaces, and initializes the network resources required for
* discovering of the network and memory related devices.
* This routine is responsible for initialization all information required for
* This routine is responsible for initialization all information required for
* a particular application scope, for example, MPI application, OpenSHMEM
* application, etc.
*
Expand Down
129 changes: 125 additions & 4 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ const char *ucp_feature_str[] = {
ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
ucp_config_t **config_p)
{
unsigned full_prefix_len = sizeof(UCS_DEFAULT_ENV_PREFIX) + 1;
unsigned full_prefix_len = sizeof(UCS_DEFAULT_ENV_PREFIX);
unsigned env_prefix_len = 0;
ucp_config_t *config;
ucs_status_t status;
Expand All @@ -339,7 +339,8 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,

if (env_prefix != NULL) {
env_prefix_len = strlen(env_prefix);
full_prefix_len += env_prefix_len;
/* Extra one byte for underscore _ character */
full_prefix_len += env_prefix_len + 1;
}

config->env_prefix = ucs_malloc(full_prefix_len, "ucp config");
Expand All @@ -362,6 +363,8 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
goto err_free_prefix;
}

ucs_list_head_init(&config->cached_key_list);

*config_p = config;
return UCS_OK;

Expand All @@ -373,24 +376,126 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
return status;
}

static void ucp_cached_key_release(ucs_config_cached_key_t *key_val)
{
ucs_assert(key_val != NULL);

ucs_free(key_val->key);
ucs_free(key_val->value);
ucs_free(key_val);
}

static void ucp_cached_key_list_release(ucs_list_link_t *list)
{
ucs_config_cached_key_t *key_val;

while (!ucs_list_is_empty(list)) {
key_val = ucs_list_extract_head(list, typeof(*key_val), list);
ucp_cached_key_release(key_val);
}
}

static ucs_status_t
ucp_config_cached_key_add(ucs_list_link_t *list,
const char *key, const char *value)
{
ucs_config_cached_key_t *cached_key;

cached_key = ucs_malloc(sizeof(*cached_key), "cached config key/value");
if (cached_key == NULL) {
goto err;
}

cached_key->key = ucs_strdup(key, "cached config key");
cached_key->value = ucs_strdup(value, "cached config value");
cached_key->used = 0;
if ((cached_key->key == NULL) || (cached_key->value == NULL)) {
goto err_free_key;
}

ucs_list_add_tail(list, &cached_key->list);
return UCS_OK;

err_free_key:
ucp_cached_key_release(cached_key);
err:
return UCS_ERR_NO_MEMORY;
}

void ucp_config_release(ucp_config_t *config)
{
ucp_cached_key_list_release(&config->cached_key_list);
ucs_config_parser_release_opts(config, ucp_config_table);
ucs_free(config->env_prefix);
ucs_free(config);
}

ucs_status_t ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value)
{
return ucs_config_parser_set_value(config, ucp_config_table, name, value);
}

ucs_status_t ucp_config_modify(ucp_config_t *config, const char *name,
const char *value)
{
return ucs_config_parser_set_value(config, ucp_config_table, name, value);
ucs_status_t status;

status = ucp_config_modify_internal(config, name, value);
if (status != UCS_ERR_NO_ELEM) {
return status;
}

return ucp_config_cached_key_add(&config->cached_key_list, name, value);
}

static
void ucp_config_print_cached_uct(const ucp_config_t *config, FILE *stream,
const char *title,
ucs_config_print_flags_t flags)
{
ucs_config_cached_key_t *key_val;

if (flags & UCS_CONFIG_PRINT_HEADER) {
fprintf(stream, "\n");
fprintf(stream, "#\n");
fprintf(stream, "# Cached UCT %s\n", title);
fprintf(stream, "#\n");
fprintf(stream, "\n");
}

if (flags & UCS_CONFIG_PRINT_CONFIG) {
ucs_list_for_each(key_val, &config->cached_key_list, list) {
fprintf(stream, "%s=%s\n", key_val->key, key_val->value);
}
}

if (flags & UCS_CONFIG_PRINT_HEADER) {
fprintf(stream, "\n");
}
}

void ucp_config_print(const ucp_config_t *config, FILE *stream,
const char *title, ucs_config_print_flags_t print_flags)
{
ucs_config_parser_print_opts(stream, title, config, ucp_config_table,
NULL, UCS_DEFAULT_ENV_PREFIX, print_flags);
ucp_config_print_cached_uct(config, stream, title, print_flags);
}

void ucp_apply_uct_config_list(ucp_context_h context, void *config)
{
ucs_config_cached_key_t *key_val;
ucs_status_t status;

ucs_list_for_each(key_val, &context->cached_key_list, list) {
status = uct_config_modify(config, key_val->key, key_val->value);
if (status == UCS_OK) {
ucs_debug("apply uct configuration %s=%s",
key_val->key, key_val->value);
key_val->used = 1;
}
}
}

/* Search str in the array. If str_suffix is specified, search for
Expand Down Expand Up @@ -846,6 +951,8 @@ static ucs_status_t ucp_fill_tl_md(ucp_context_h context,
return status;
}

ucp_apply_uct_config_list(context, md_config);

status = uct_md_open(context->tl_cmpts[cmpt_index].cmpt, md_rsc->md_name,
md_config, &tl_md->md);
uct_config_release(md_config);
Expand Down Expand Up @@ -1373,6 +1480,7 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
{
unsigned i, num_alloc_methods, method;
const char *method_name;
ucs_config_cached_key_t *key_val;
ucs_status_t status;

ucp_apply_params(context, params,
Expand Down Expand Up @@ -1403,7 +1511,7 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
context->config.ext.bcopy_bw);

/* always init MT lock in context even though it is disabled by user,
* because we need to use context lock to protect ucp_mm_ and ucp_rkey_
* because we need to use context lock to protect ucp_mem_ and ucp_rkey_
* routines */
UCP_THREAD_LOCK_INIT(&context->mt_lock);

Expand Down Expand Up @@ -1486,8 +1594,18 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
}
}

ucs_list_for_each(key_val, &config->cached_key_list, list) {
status = ucp_config_cached_key_add(&context->cached_key_list,
key_val->key, key_val->value);
if (status != UCS_OK) {
goto err_free_key_list;
}
}

return UCS_OK;

err_free_key_list:
ucp_cached_key_list_release(&context->cached_key_list);
err_free_alloc_methods:
ucs_free(context->config.alloc_methods);
err_free_env_prefix:
Expand All @@ -1501,6 +1619,7 @@ static void ucp_free_config(ucp_context_h context)
{
ucs_free(context->config.alloc_methods);
ucs_free(context->config.env_prefix);
ucp_cached_key_list_release(&context->cached_key_list);
}

ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_version,
Expand Down Expand Up @@ -1539,6 +1658,8 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
goto err_release_config;
}

ucs_list_head_init(&context->cached_key_list);

status = ucp_fill_config(context, params, config);
if (status != UCS_OK) {
goto err_free_ctx;
Expand Down
15 changes: 12 additions & 3 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ typedef struct ucp_context_config {

struct ucp_config {
/** Array of device lists names to use.
* This array holds three lists - network devices, shared memory devices
* and acceleration devices */
* This array holds four lists - network devices, shared memory devices,
* acceleration devices and loop-back devices */
ucs_config_names_array_t devices[UCT_DEVICE_TYPE_LAST];
/** Array of transport names to use */
ucs_config_names_array_t tls;
Expand All @@ -123,6 +123,8 @@ struct ucp_config {
char *env_prefix;
/** Configuration saved directly in the context */
ucp_context_config_t ctx;
/** Save ucx configurations not listed in ucp_config_table **/
ucs_list_link_t cached_key_list;
};


Expand Down Expand Up @@ -172,7 +174,6 @@ typedef struct ucp_tl_md {
* UCP context
*/
typedef struct ucp_context {

ucp_tl_cmpt_t *tl_cmpts; /* UCT components */
ucp_rsc_index_t num_cmpts; /* Number of UCT components */

Expand Down Expand Up @@ -247,6 +248,8 @@ typedef struct ucp_context {
/* All configurations about multithreading support */
ucp_mt_lock_t mt_lock;

/* Save cached uct configurations */
ucs_list_link_t cached_key_list;
} ucp_context_t;


Expand Down Expand Up @@ -466,4 +469,10 @@ uint64_t ucp_context_dev_tl_bitmap(ucp_context_h context, const char *dev_name);
uint64_t ucp_context_dev_idx_tl_bitmap(ucp_context_h context,
ucp_rsc_index_t dev_idx);

ucs_status_t
ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value);

void ucp_apply_uct_config_list(ucp_context_h context, void *config);

#endif
33 changes: 33 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,8 @@ ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id,
goto err_free_iface;
}

ucp_apply_uct_config_list(context, iface_config);

UCS_STATIC_ASSERT(UCP_WORKER_HEADROOM_PRIV_SIZE >= sizeof(ucp_eager_sync_hdr_t));

/* Fill rest of uct_iface params (caller should fill specific mode fields) */
Expand Down Expand Up @@ -1290,6 +1292,8 @@ static ucs_status_t ucp_worker_add_resource_cms(ucp_worker_h worker)
goto err_free_cms;
}

ucp_apply_uct_config_list(context, cm_config);

status = uct_cm_open(cmpt, worker->uct, cm_config, &worker->cms[i].cm);
if (status != UCS_OK) {
ucs_error("failed to open CM on component %s with status %s",
Expand Down Expand Up @@ -1660,6 +1664,32 @@ static ucs_mpool_ops_t ucp_rkey_mpool_ops = {
.obj_str = NULL
};

static void ucp_warn_unused_uct_config(ucp_context_h context)
{
unsigned num_unused_cached_kv = 0;
ucs_string_buffer_t unused_cached_uct_cfg;
ucs_config_cached_key_t *key_val;

ucs_string_buffer_init(&unused_cached_uct_cfg);

ucs_list_for_each(key_val, &context->cached_key_list, list) {
if (!key_val->used) {
ucs_string_buffer_appendf(&unused_cached_uct_cfg, "%s,",
key_val->key);
++num_unused_cached_kv;
}
}

if (num_unused_cached_kv > 0) {
ucs_string_buffer_rtrim(&unused_cached_uct_cfg , ",");
ucs_warn("unused cached uct configuration%s: %s",
(num_unused_cached_kv > 1) ? "s" : "",
ucs_string_buffer_cstr(&unused_cached_uct_cfg));
}

ucs_string_buffer_cleanup(&unused_cached_uct_cfg);
}

ucs_status_t ucp_worker_create(ucp_context_h context,
const ucp_worker_params_t *params,
ucp_worker_h *worker_p)
Expand Down Expand Up @@ -1839,6 +1869,9 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
*/
ucs_config_parser_print_env_vars_once(context->config.env_prefix);

/* Warn unused cached uct configuration */
ucp_warn_unused_uct_config(context);

*worker_p = worker;
return UCS_OK;

Expand Down
9 changes: 9 additions & 0 deletions src/ucs/config/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ typedef struct ucs_config_field {
} ucs_config_field_t;


typedef struct ucs_config_cached_key {
char *key; /* Cached configuration key */
char *value; /* Cached configuration value */
int used; /* Whether this configuration was
* applied successfully */
ucs_list_link_t list; /* Element in a list of key/value entries */
} ucs_config_cached_key_t;


typedef struct ucs_ib_port_spec {
char *device_name;
unsigned port_num;
Expand Down
1 change: 0 additions & 1 deletion src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -2252,7 +2252,6 @@ ucs_status_t uct_md_config_read(uct_component_h component,
uct_md_config_t **config_p);



/**
* @ingroup UCT_MD
* @brief Check if remote sock address is accessible from the memory domain.
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void ucp_test::modify_config(const std::string& name, const std::string& value,
{
ucs_status_t status;

status = ucp_config_modify(m_ucp_config, name.c_str(), value.c_str());
status = ucp_config_modify_internal(m_ucp_config, name.c_str(), value.c_str());
if (status == UCS_ERR_NO_ELEM) {
test_base::modify_config(name, value, optional);
} else if (status != UCS_OK) {
Expand Down

0 comments on commit 95bc48d

Please sign in to comment.