Skip to content

Commit

Permalink
UCP: Set UCT parameters from UCP API
Browse files Browse the repository at this point in the history
Signed-off-by: Changcheng Liu <jerrliu@nvidia.com>
  • Loading branch information
changchengx committed Sep 23, 2021
1 parent cced792 commit 7a2c75a
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,7 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
* This routine checks API version compatibility, then discovers the available
* network interfaces, and initializes the network resources required for
* discovering of the network and memory related devices.
* This routine is responsible for initialization all information required for
* This routine is responsible for initialization all information required for
* a particular application scope, for example, MPI application, OpenSHMEM
* application, etc.
*
Expand Down
129 changes: 125 additions & 4 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ const ucp_tl_bitmap_t ucp_tl_bitmap_min = UCS_BITMAP_ZERO;
ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
ucp_config_t **config_p)
{
unsigned full_prefix_len = sizeof(UCS_DEFAULT_ENV_PREFIX) + 1;
unsigned full_prefix_len = sizeof(UCS_DEFAULT_ENV_PREFIX);
unsigned env_prefix_len = 0;
ucp_config_t *config;
ucs_status_t status;
Expand All @@ -416,7 +416,8 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,

if (env_prefix != NULL) {
env_prefix_len = strlen(env_prefix);
full_prefix_len += env_prefix_len;
/* Extra one byte for underscore _ character */
full_prefix_len += env_prefix_len + 1;
}

config->env_prefix = ucs_malloc(full_prefix_len, "ucp config");
Expand All @@ -439,6 +440,8 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
goto err_free_prefix;
}

ucs_list_head_init(&config->cached_key_list);

*config_p = config;
return UCS_OK;

Expand All @@ -450,24 +453,126 @@ ucs_status_t ucp_config_read(const char *env_prefix, const char *filename,
return status;
}

static void ucp_cached_key_release(ucs_config_cached_key_t *key_val)
{
ucs_assert(key_val != NULL);

ucs_free(key_val->key);
ucs_free(key_val->value);
ucs_free(key_val);
}

static void ucp_cached_key_list_release(ucs_list_link_t *list)
{
ucs_config_cached_key_t *key_val;

while (!ucs_list_is_empty(list)) {
key_val = ucs_list_extract_head(list, typeof(*key_val), list);
ucp_cached_key_release(key_val);
}
}

static ucs_status_t
ucp_config_cached_key_add(ucs_list_link_t *list,
const char *key, const char *value)
{
ucs_config_cached_key_t *cached_key;

cached_key = ucs_malloc(sizeof(*cached_key), "cached config key/value");
if (cached_key == NULL) {
goto err;
}

cached_key->key = ucs_strdup(key, "cached config key");
cached_key->value = ucs_strdup(value, "cached config value");
cached_key->used = 0;
if ((cached_key->key == NULL) || (cached_key->value == NULL)) {
goto err_free_key;
}

ucs_list_add_tail(list, &cached_key->list);
return UCS_OK;

err_free_key:
ucp_cached_key_release(cached_key);
err:
return UCS_ERR_NO_MEMORY;
}

void ucp_config_release(ucp_config_t *config)
{
ucp_cached_key_list_release(&config->cached_key_list);
ucs_config_parser_release_opts(config, ucp_config_table);
ucs_free(config->env_prefix);
ucs_free(config);
}

ucs_status_t ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value)
{
return ucs_config_parser_set_value(config, ucp_config_table, name, value);
}

ucs_status_t ucp_config_modify(ucp_config_t *config, const char *name,
const char *value)
{
return ucs_config_parser_set_value(config, ucp_config_table, name, value);
ucs_status_t status;

status = ucp_config_modify_internal(config, name, value);
if (status != UCS_ERR_NO_ELEM) {
return status;
}

return ucp_config_cached_key_add(&config->cached_key_list, name, value);
}

static
void ucp_config_print_cached_uct(const ucp_config_t *config, FILE *stream,
const char *title,
ucs_config_print_flags_t flags)
{
ucs_config_cached_key_t *key_val;

if (flags & UCS_CONFIG_PRINT_HEADER) {
fprintf(stream, "\n");
fprintf(stream, "#\n");
fprintf(stream, "# Cached UCT %s\n", title);
fprintf(stream, "#\n");
fprintf(stream, "\n");
}

if (flags & UCS_CONFIG_PRINT_CONFIG) {
ucs_list_for_each(key_val, &config->cached_key_list, list) {
fprintf(stream, "%s=%s\n", key_val->key, key_val->value);
}
}

if (flags & UCS_CONFIG_PRINT_HEADER) {
fprintf(stream, "\n");
}
}

void ucp_config_print(const ucp_config_t *config, FILE *stream,
const char *title, ucs_config_print_flags_t print_flags)
{
ucs_config_parser_print_opts(stream, title, config, ucp_config_table,
NULL, UCS_DEFAULT_ENV_PREFIX, print_flags);
ucp_config_print_cached_uct(config, stream, title, print_flags);
}

void ucp_apply_uct_config_list(ucp_context_h context, void *config)
{
ucs_config_cached_key_t *key_val;
ucs_status_t status;

ucs_list_for_each(key_val, &context->cached_key_list, list) {
status = uct_config_modify(config, key_val->key, key_val->value);
if (status == UCS_OK) {
ucs_debug("apply uct configuration %s=%s",
key_val->key, key_val->value);
key_val->used = 1;
}
}
}

/* Search str in the array. If str_suffix is specified, search for
Expand Down Expand Up @@ -920,6 +1025,8 @@ static ucs_status_t ucp_fill_tl_md(ucp_context_h context,
return status;
}

ucp_apply_uct_config_list(context, md_config);

status = uct_md_open(context->tl_cmpts[cmpt_index].cmpt, md_rsc->md_name,
md_config, &tl_md->md);
uct_config_release(md_config);
Expand Down Expand Up @@ -1364,6 +1471,7 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
ucp_proto_id_t proto_id;
ucs_status_t status;
int match;
ucs_config_cached_key_t *key_val;

ucp_apply_params(context, params,
config->ctx.use_mt_mutex ? UCP_MT_TYPE_MUTEX
Expand Down Expand Up @@ -1408,7 +1516,7 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
}

/* always init MT lock in context even though it is disabled by user,
* because we need to use context lock to protect ucp_mm_ and ucp_rkey_
* because we need to use context lock to protect ucp_mem_ and ucp_rkey_
* routines */
UCP_THREAD_LOCK_INIT(&context->mt_lock);

Expand Down Expand Up @@ -1505,8 +1613,18 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
goto err_free_alloc_methods;
}

ucs_list_for_each(key_val, &config->cached_key_list, list) {
status = ucp_config_cached_key_add(&context->cached_key_list,
key_val->key, key_val->value);
if (status != UCS_OK) {
goto err_free_key_list;
}
}

return UCS_OK;

err_free_key_list:
ucp_cached_key_list_release(&context->cached_key_list);
err_free_alloc_methods:
ucs_free(context->config.alloc_methods);
err_free_env_prefix:
Expand All @@ -1523,6 +1641,7 @@ static void ucp_free_config(ucp_context_h context)
ucs_free(context->config.alloc_methods);
ucs_free(context->config.env_prefix);
ucs_free(context->config.selection_cmp);
ucp_cached_key_list_release(&context->cached_key_list);
}

static void ucp_context_create_vfs(ucp_context_h context)
Expand Down Expand Up @@ -1569,6 +1688,8 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
goto err_release_config;
}

ucs_list_head_init(&context->cached_key_list);

status = ucp_fill_config(context, params, config);
if (status != UCS_OK) {
goto err_free_ctx;
Expand Down
17 changes: 14 additions & 3 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ typedef struct ucp_context_config {

struct ucp_config {
/** Array of device lists names to use.
* This array holds three lists - network devices, shared memory devices
* and acceleration devices */
* This array holds four lists - network devices, shared memory devices,
* acceleration devices and loop-back devices */
ucs_config_names_array_t devices[UCT_DEVICE_TYPE_LAST];
/** Array of transport names to use */
ucs_config_allow_list_t tls;
Expand All @@ -146,6 +146,8 @@ struct ucp_config {
char *selection_cmp;
/** Configuration saved directly in the context */
ucp_context_config_t ctx;
/** Save ucx configurations not listed in ucp_config_table **/
ucs_list_link_t cached_key_list;
};


Expand Down Expand Up @@ -195,7 +197,6 @@ typedef struct ucp_tl_md {
* UCP context
*/
typedef struct ucp_context {

ucp_tl_cmpt_t *tl_cmpts; /* UCT components */
ucp_rsc_index_t num_cmpts; /* Number of UCT components */

Expand Down Expand Up @@ -276,6 +277,8 @@ typedef struct ucp_context {

char name[UCP_ENTITY_NAME_MAX];

/* Save cached uct configurations */
ucs_list_link_t cached_key_list;
} ucp_context_t;


Expand Down Expand Up @@ -546,4 +549,12 @@ void ucp_tl_bitmap_validate(const ucp_tl_bitmap_t *tl_bitmap,

const char* ucp_context_cm_name(ucp_context_h context, ucp_rsc_index_t cm_idx);


ucs_status_t
ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value);


void ucp_apply_uct_config_list(ucp_context_h context, void *config);

#endif
33 changes: 33 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,8 @@ ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id,
goto err_free_iface;
}

ucp_apply_uct_config_list(context, iface_config);

UCS_STATIC_ASSERT(UCP_WORKER_HEADROOM_PRIV_SIZE >= sizeof(ucp_eager_sync_hdr_t));

/* Fill rest of uct_iface params (caller should fill specific mode fields) */
Expand Down Expand Up @@ -1351,6 +1353,8 @@ static ucs_status_t ucp_worker_add_resource_cms(ucp_worker_h worker)
goto err_free_cms;
}

ucp_apply_uct_config_list(context, cm_config);

status = uct_cm_open(cmpt, worker->uct, cm_config, &worker->cms[i].cm);
uct_config_release(cm_config);
if (status != UCS_OK) {
Expand Down Expand Up @@ -1945,6 +1949,32 @@ ucs_thread_mode_t ucp_worker_get_thread_mode(uint64_t worker_flags)
return UCS_THREAD_MODE_SINGLE;
}

static void ucp_warn_unused_uct_config(ucp_context_h context)
{
unsigned num_unused_cached_kv = 0;
ucs_string_buffer_t unused_cached_uct_cfg;
ucs_config_cached_key_t *key_val;

ucs_string_buffer_init(&unused_cached_uct_cfg);

ucs_list_for_each(key_val, &context->cached_key_list, list) {
if (!key_val->used) {
ucs_string_buffer_appendf(&unused_cached_uct_cfg, "%s,",
key_val->key);
++num_unused_cached_kv;
}
}

if (num_unused_cached_kv > 0) {
ucs_string_buffer_rtrim(&unused_cached_uct_cfg , ",");
ucs_warn("unused cached uct configuration%s: %s",
(num_unused_cached_kv > 1) ? "s" : "",
ucs_string_buffer_cstr(&unused_cached_uct_cfg));
}

ucs_string_buffer_cleanup(&unused_cached_uct_cfg);
}

static void
ucp_worker_vfs_show_primitive(void *obj, ucs_string_buffer_t *strb,
void *arg_ptr, uint64_t arg_u64)
Expand Down Expand Up @@ -2175,6 +2205,9 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
*/
ucs_config_parser_print_env_vars_once(context->config.env_prefix);

/* Warn unused cached uct configuration */
ucp_warn_unused_uct_config(context);

ucp_worker_create_vfs(context, worker);

*worker_p = worker;
Expand Down
9 changes: 9 additions & 0 deletions src/ucs/config/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ typedef struct ucs_config_field {
} ucs_config_field_t;


typedef struct ucs_config_cached_key {
char *key; /* Cached configuration key */
char *value; /* Cached configuration value */
int used; /* Whether this configuration was
* applied successfully */
ucs_list_link_t list; /* Element in a list of key/value entries */
} ucs_config_cached_key_t;


typedef struct ucs_ib_port_spec {
char *device_name;
unsigned port_num;
Expand Down
1 change: 0 additions & 1 deletion src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -2470,7 +2470,6 @@ ucs_status_t uct_md_config_read(uct_component_h component,
uct_md_config_t **config_p);



/**
* @ingroup UCT_MD
* @brief Check if remote sock address is accessible from the memory domain.
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ void ucp_test::modify_config(const std::string& name, const std::string& value,
{
ucs_status_t status;

status = ucp_config_modify(m_ucp_config, name.c_str(), value.c_str());
status = ucp_config_modify_internal(m_ucp_config, name.c_str(), value.c_str());
if (status == UCS_ERR_NO_ELEM) {
test_base::modify_config(name, value, mode);
} else if (status != UCS_OK) {
Expand Down

0 comments on commit 7a2c75a

Please sign in to comment.