Skip to content

Commit

Permalink
Merge pull request #5900 from evgeny-leksikov/ucp_disable_non_cm_sock…
Browse files Browse the repository at this point in the history
…addr

UCP/WIREUP: remove non-cm sockaddr wireup
  • Loading branch information
yosefe authored Jan 12, 2021
2 parents 19a9a4f + 2a3ad00 commit cbc5f2f
Show file tree
Hide file tree
Showing 19 changed files with 116 additions and 911 deletions.
1 change: 0 additions & 1 deletion buildlib/io_demo/az-stage-io-demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ steps:
sudo /hpc/local/bin/lshca
mkdir -p $(workspace)/${{ parameters.name }}
# set UCX environment variables
export UCX_SOCKADDR_CM_ENABLE=y
export UCX_NET_DEVICES=$(ibdev2netdev | sed -ne 's/\(\w*\) port \([0-9]\) ==> '${roce_iface}' .*/\1:\2/p')
export LD_LIBRARY_PATH=$(workspace)/install/lib:$LD_LIBRARY_PATH
$(workspace)/test/apps/iodemo/run_io_demo.sh \
Expand Down
94 changes: 6 additions & 88 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,6 @@ static ucs_config_field_t ucp_config_table[] = {
"of all entities which connect to each other are the same.",
ucs_offsetof(ucp_config_t, ctx.unified_mode), UCS_CONFIG_TYPE_BOOL},

{"SOCKADDR_CM_ENABLE", "y",
"Enable alternative wireup protocol for sockaddr connected endpoints.\n"
"Enabling this mode changes underlying UCT mechanism for connection\n"
"establishment and enables synchronized close protocol which does not\n"
"require out of band synchronization before destroying UCP resources.",
ucs_offsetof(ucp_config_t, ctx.sockaddr_cm_enable), UCS_CONFIG_TYPE_TERNARY},

{"CM_USE_ALL_DEVICES", "y",
"When creating client/server endpoints, use all available devices.\n"
"If disabled, use only the one device on which the connection\n"
Expand Down Expand Up @@ -959,73 +952,9 @@ static void ucp_resource_config_str(const ucp_config_t *config, char *buf,
}
}

static void ucp_fill_sockaddr_aux_tls_config(ucp_context_h context,
const ucp_config_t *config)
{
const char **tl_names = (const char**)config->sockaddr_aux_tls.aux_tls;
unsigned count = config->sockaddr_aux_tls.count;
uint8_t dummy_flags = 0;
uint64_t dummy_mask = 0;
ucp_rsc_index_t tl_id;

context->config.sockaddr_aux_rscs_bitmap = 0;

/* Check if any of the context's resources are present in the sockaddr
* auxiliary transports for the client-server flow */
ucs_for_each_bit(tl_id, context->tl_bitmap) {
if (ucp_is_resource_in_transports_list(context->tl_rscs[tl_id].tl_rsc.tl_name,
tl_names, count, &dummy_flags,
&dummy_mask)) {
context->config.sockaddr_aux_rscs_bitmap |= UCS_BIT(tl_id);
}
}
}

static void ucp_fill_sockaddr_tls_prio_list(ucp_context_h context,
const char **sockaddr_tl_names,
ucp_rsc_index_t num_sockaddr_tls)
{
uint64_t sa_tls_bitmap = 0;
ucp_rsc_index_t idx = 0;
ucp_tl_resource_desc_t *resource;
ucp_rsc_index_t tl_id;
ucp_tl_md_t *tl_md;
ucp_rsc_index_t j;

/* Set a bitmap of sockaddr transports */
for (j = 0; j < context->num_tls; ++j) {
resource = &context->tl_rscs[j];
tl_md = &context->tl_mds[resource->md_index];
if (tl_md->attr.cap.flags & UCT_MD_FLAG_SOCKADDR) {
sa_tls_bitmap |= UCS_BIT(j);
}
}

/* Parse the sockaddr transports priority list */
for (j = 0; j < num_sockaddr_tls; j++) {
/* go over the priority list and find the transport's tl_id in the
* sockaddr tls bitmap. save the tl_id's for the client/server usage
* later */
ucs_for_each_bit(tl_id, sa_tls_bitmap) {
resource = &context->tl_rscs[tl_id];

if (!strcmp(sockaddr_tl_names[j], "*") ||
!strncmp(sockaddr_tl_names[j], resource->tl_rsc.tl_name,
UCT_TL_NAME_MAX)) {
context->config.sockaddr_tl_ids[idx] = tl_id;
idx++;
sa_tls_bitmap &= ~UCS_BIT(tl_id);
}
}
}

context->config.num_sockaddr_tls = idx;
}

static void ucp_fill_sockaddr_cms_prio_list(ucp_context_h context,
const char **sockaddr_cm_names,
ucp_rsc_index_t num_sockaddr_cms,
int sockaddr_cm_enable)
ucp_rsc_index_t num_sockaddr_cms)
{
uint64_t cm_cmpts_bitmap = context->config.cm_cmpts_bitmap;
uint64_t cm_cmpts_bitmap_safe;
Expand All @@ -1034,10 +963,6 @@ static void ucp_fill_sockaddr_cms_prio_list(ucp_context_h context,
memset(&context->config.cm_cmpt_idxs, UCP_NULL_RESOURCE, UCP_MAX_RESOURCES);
context->config.num_cm_cmpts = 0;

if (!sockaddr_cm_enable) {
return;
}

/* Parse the sockaddr CMs priority list */
for (cm_idx = 0; cm_idx < num_sockaddr_cms; ++cm_idx) {
/* go over the priority list and find the CM's cm_idx in the
Expand All @@ -1061,8 +986,6 @@ static ucs_status_t ucp_fill_sockaddr_prio_list(ucp_context_h context,
{
const char **sockaddr_tl_names = (const char**)config->sockaddr_cm_tls.cm_tls;
unsigned num_sockaddr_tls = config->sockaddr_cm_tls.count;
int sockaddr_cm_enable = context->config.ext.sockaddr_cm_enable !=
UCS_NO;

/* Check if a list of sockaddr transports/CMs has valid length */
if (num_sockaddr_tls > UCP_MAX_RESOURCES) {
Expand All @@ -1071,13 +994,10 @@ static ucs_status_t ucp_fill_sockaddr_prio_list(ucp_context_h context,
num_sockaddr_tls = UCP_MAX_RESOURCES;
}

ucp_fill_sockaddr_tls_prio_list(context, sockaddr_tl_names,
num_sockaddr_tls);
ucp_fill_sockaddr_cms_prio_list(context, sockaddr_tl_names,
num_sockaddr_tls, sockaddr_cm_enable);
if ((context->config.ext.sockaddr_cm_enable == UCS_YES) &&
(context->config.num_cm_cmpts == 0)) {
ucs_error("UCX_SOCKADDR_CM_ENABLE is set to yes but none of the available components supports SOCKADDR_CM");
num_sockaddr_tls);
if (context->config.num_cm_cmpts == 0) {
ucs_diag("none of the available components supports sockaddr connection management");
return UCS_ERR_UNSUPPORTED;
}

Expand Down Expand Up @@ -1274,9 +1194,8 @@ static ucs_status_t ucp_fill_resources(ucp_context_h context,
max_mds += context->tl_cmpts[i].attr.md_resource_count;
}

if ((context->config.ext.sockaddr_cm_enable == UCS_YES) &&
(context->config.cm_cmpts_bitmap == 0)) {
ucs_error("there are no UCT components with CM capability");
if (context->config.cm_cmpts_bitmap == 0) {
ucs_debug("there are no UCT components with CM capability");
status = UCS_ERR_UNSUPPORTED;
goto err_free_resources;
}
Expand Down Expand Up @@ -1339,7 +1258,6 @@ static ucs_status_t ucp_fill_resources(ucp_context_h context,
goto err_free_resources;
}

ucp_fill_sockaddr_aux_tls_config(context, config);
status = ucp_fill_sockaddr_prio_list(context, config);
if (status != UCS_OK) {
goto err_free_resources;
Expand Down
9 changes: 0 additions & 9 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ typedef struct ucp_context_config {
int flush_worker_eps;
/** Enable optimizations suitable for homogeneous systems */
int unified_mode;
/** Enable cm wireup-and-close protocol for client-server connections */
ucs_ternary_auto_value_t sockaddr_cm_enable;
/** Enable cm wireup message exchange to select the best transports
* for all lanes after cm phase is done */
int cm_use_all_devices;
Expand Down Expand Up @@ -240,13 +238,6 @@ typedef struct ucp_context {
/* Cached map of components which support CM capability */
uint64_t cm_cmpts_bitmap;

/* Bitmap of sockaddr auxiliary transports to pack for client/server flow */
uint64_t sockaddr_aux_rscs_bitmap;

/* Array of sockaddr transports indexes.
* The indexes appear in the configured priority order */
ucp_rsc_index_t sockaddr_tl_ids[UCP_MAX_RESOURCES];
ucp_rsc_index_t num_sockaddr_tls;
/* Array of CMs indexes. The indexes appear in the configured priority
* order. */
ucp_rsc_index_t cm_cmpt_idxs[UCP_MAX_RESOURCES];
Expand Down
127 changes: 14 additions & 113 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,57 +229,13 @@ static void ucp_ep_delete(ucp_ep_h ep)
ucp_ep_destroy_base(ep);
}

ucs_status_t
ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_address,
ucp_ep_h *ep_p)
{
ucp_wireup_ep_t *wireup_ep;
ucs_status_t status;
ucp_ep_h ep;

/* allocate endpoint */
status = ucp_worker_create_ep(worker, ep_init_flags, remote_address->name,
"listener", &ep);
if (status != UCS_OK) {
goto err;
}

status = ucp_ep_init_create_wireup(ep, ep_init_flags, &wireup_ep);
if (status != UCS_OK) {
goto err_delete;
}

status = ucp_wireup_ep_connect_aux(wireup_ep, ep_init_flags, remote_address);
if (status != UCS_OK) {
goto err_destroy_wireup_ep;
}

*ep_p = ep;
return status;

err_destroy_wireup_ep:
uct_ep_destroy(ep->uct_eps[0]);
err_delete:
ucp_ep_delete(ep);
err:
return status;
}

void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
unsigned ep_init_flags)
{
key->err_mode = (ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) ?
UCP_ERR_HANDLING_MODE_PEER : UCP_ERR_HANDLING_MODE_NONE;
}

int ucp_ep_is_sockaddr_stub(ucp_ep_h ep)
{
/* Only a sockaddr client-side endpoint may be created as a "stub" */
return (ucp_ep_get_rsc_index(ep, 0) == UCP_NULL_RESOURCE) &&
!ucp_ep_has_cm_lane(ep);
}

static ucs_status_t
ucp_ep_adjust_params(ucp_ep_h ep, const ucp_ep_params_t *params)
{
Expand Down Expand Up @@ -390,6 +346,9 @@ ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_ep_config_key_t key;
ucs_status_t status;

ucs_assert(ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT);
ucs_assert(ucp_worker_num_cm_cmpts(ep->worker) != 0);

ucp_ep_config_key_reset(&key);
ucp_ep_config_key_set_err_mode(&key, ep_init_flags);

Expand Down Expand Up @@ -477,7 +436,8 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker,

/* allocate endpoint */
ucs_sockaddr_str(params->sockaddr.addr, peer_name, sizeof(peer_name));
ep_init_flags = ucp_ep_init_flags(worker, params);
ep_init_flags = ucp_ep_init_flags(worker, params) |
ucp_cm_ep_init_flags(params);

status = ucp_worker_create_ep(worker, ep_init_flags, peer_name,
"from api call", &ep);
Expand All @@ -495,9 +455,7 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker,
goto err_cleanup_lanes;
}

status = ucp_worker_sockaddr_is_cm_proto(ep->worker) ?
ucp_ep_client_cm_connect_start(ep, params) :
ucp_wireup_ep_connect_to_sockaddr(ep->uct_eps[0], params);
status = ucp_ep_client_cm_connect_start(ep, params);
if (status != UCS_OK) {
goto err_cleanup_lanes;
}
Expand Down Expand Up @@ -544,77 +502,20 @@ ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker,
return status;
}

switch (sa_data->addr_mode) {
case UCP_WIREUP_SA_DATA_FULL_ADDR:
/* create endpoint to the worker address we got in the private data */
status = ucp_ep_create_to_worker_addr(worker, UINT64_MAX, &remote_addr,
ep_init_flags |
UCP_EP_INIT_CREATE_AM_LANE,
"listener", ep_p);
if (status != UCS_OK) {
goto non_cm_err_reject;
}

ucs_assert(ucp_ep_config(*ep_p)->key.err_mode == sa_data->err_mode);
ucp_ep_flush_state_reset(*ep_p);
ucp_ep_update_remote_id(*ep_p, sa_data->ep_id);
/* send wireup request message, to connect the client to the server's
new endpoint */
ucs_assert(!((*ep_p)->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED));
status = ucp_wireup_send_request(*ep_p);
if (status != UCS_OK) {
goto non_cm_err_destroy_ep;
}
break;
case UCP_WIREUP_SA_DATA_PARTIAL_ADDR:
status = ucp_ep_create_sockaddr_aux(worker, ep_init_flags,
&remote_addr, ep_p);
if (status != UCS_OK) {
goto non_cm_err_reject;
}

ucp_ep_update_remote_id(*ep_p, sa_data->ep_id);
/* the server's ep should be aware of the sent address from the client */
(*ep_p)->flags |= UCP_EP_FLAG_LISTENER;
/* NOTE: protect union */
ucs_assert(!((*ep_p)->flags & (UCP_EP_FLAG_ON_MATCH_CTX |
UCP_EP_FLAG_FLUSH_STATE_VALID)));
status = ucp_wireup_send_pre_request(*ep_p);
if (status != UCS_OK) {
goto non_cm_err_destroy_ep;
}
break;
case UCP_WIREUP_SA_DATA_CM_ADDR:
ucs_assert(ucp_worker_sockaddr_is_cm_proto(worker));
for (i = 0; i < remote_addr.address_count; ++i) {
remote_addr.address_list[i].dev_addr = conn_request->remote_dev_addr;
remote_addr.address_list[i].dev_index = conn_request->sa_data.dev_index;
}
status = ucp_ep_cm_server_create_connected(worker, ep_init_flags,
&remote_addr, conn_request,
ep_p);
ucs_free(remote_addr.address_list);
return status;
default:
if (sa_data->addr_mode != UCP_WIREUP_SA_DATA_CM_ADDR) {
ucs_fatal("client sockaddr data contains invalid address mode %d",
sa_data->addr_mode);
}

/* common non-CM flow */
status = uct_iface_accept(conn_request->uct.iface,
conn_request->uct_req);
goto non_cm_out;
for (i = 0; i < remote_addr.address_count; ++i) {
remote_addr.address_list[i].dev_addr = conn_request->remote_dev_addr;
remote_addr.address_list[i].dev_index = conn_request->sa_data.dev_index;
}

non_cm_err_destroy_ep:
ucp_ep_destroy_internal(*ep_p);
non_cm_err_reject:
ucs_error("connection request failed on listener %p with status %s",
conn_request->listener, ucs_status_string(status));
uct_iface_reject(conn_request->uct.iface, conn_request->uct_req);
non_cm_out:
ucs_free(conn_request);
status = ucp_ep_cm_server_create_connected(worker, ep_init_flags,
&remote_addr, conn_request,
ep_p);
ucs_free(remote_addr.address_list);
ucs_assert(!ucp_worker_sockaddr_is_cm_proto(worker));
return status;
}

Expand Down
Loading

0 comments on commit cbc5f2f

Please sign in to comment.