From 2a3ad0086b66aab59f0ad1f4dc23d9e557b723de Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Fri, 13 Nov 2020 13:32:18 +0200 Subject: [PATCH] UCP/WIREUP: remove non-cm sockaddr wireup --- buildlib/io_demo/az-stage-io-demo.yaml | 1 - src/ucp/core/ucp_context.c | 94 +------- src/ucp/core/ucp_context.h | 9 - src/ucp/core/ucp_ep.c | 127 ++--------- src/ucp/core/ucp_ep.h | 23 +- src/ucp/core/ucp_ep.inl | 4 +- src/ucp/core/ucp_listener.c | 301 ++----------------------- src/ucp/core/ucp_listener.h | 2 +- src/ucp/core/ucp_worker.c | 2 +- src/ucp/core/ucp_worker.inl | 9 - src/ucp/wireup/ep_match.c | 3 +- src/ucp/wireup/select.c | 42 ---- src/ucp/wireup/wireup.c | 67 +----- src/ucp/wireup/wireup.h | 5 - src/ucp/wireup/wireup_cm.c | 43 ++-- src/ucp/wireup/wireup_cm.h | 3 +- src/ucp/wireup/wireup_ep.c | 184 --------------- src/ucp/wireup/wireup_ep.h | 3 - test/gtest/ucp/test_ucp_sockaddr.cc | 105 +++------ 19 files changed, 116 insertions(+), 911 deletions(-) diff --git a/buildlib/io_demo/az-stage-io-demo.yaml b/buildlib/io_demo/az-stage-io-demo.yaml index f8a2050d721..09e5e3b6efa 100644 --- a/buildlib/io_demo/az-stage-io-demo.yaml +++ b/buildlib/io_demo/az-stage-io-demo.yaml @@ -26,7 +26,6 @@ steps: sudo /hpc/local/bin/lshca mkdir -p $(workspace)/${{ parameters.name }} # set UCX environment variables - export UCX_SOCKADDR_CM_ENABLE=y export UCX_NET_DEVICES=$(ibdev2netdev | sed -ne 's/\(\w*\) port \([0-9]\) ==> '${roce_iface}' .*/\1:\2/p') export LD_LIBRARY_PATH=$(workspace)/install/lib:$LD_LIBRARY_PATH $(workspace)/test/apps/iodemo/run_io_demo.sh \ diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 6ff29907548..13270d15721 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -288,13 +288,6 @@ static ucs_config_field_t ucp_config_table[] = { "of all entities which connect to each other are the same.", ucs_offsetof(ucp_config_t, ctx.unified_mode), UCS_CONFIG_TYPE_BOOL}, - {"SOCKADDR_CM_ENABLE", "y", - "Enable alternative wireup protocol for sockaddr connected endpoints.\n" - "Enabling this mode changes underlying UCT mechanism for connection\n" - "establishment and enables synchronized close protocol which does not\n" - "require out of band synchronization before destroying UCP resources.", - ucs_offsetof(ucp_config_t, ctx.sockaddr_cm_enable), UCS_CONFIG_TYPE_TERNARY}, - {"CM_USE_ALL_DEVICES", "y", "When creating client/server endpoints, use all available devices.\n" "If disabled, use only the one device on which the connection\n" @@ -959,73 +952,9 @@ static void ucp_resource_config_str(const ucp_config_t *config, char *buf, } } -static void ucp_fill_sockaddr_aux_tls_config(ucp_context_h context, - const ucp_config_t *config) -{ - const char **tl_names = (const char**)config->sockaddr_aux_tls.aux_tls; - unsigned count = config->sockaddr_aux_tls.count; - uint8_t dummy_flags = 0; - uint64_t dummy_mask = 0; - ucp_rsc_index_t tl_id; - - context->config.sockaddr_aux_rscs_bitmap = 0; - - /* Check if any of the context's resources are present in the sockaddr - * auxiliary transports for the client-server flow */ - ucs_for_each_bit(tl_id, context->tl_bitmap) { - if (ucp_is_resource_in_transports_list(context->tl_rscs[tl_id].tl_rsc.tl_name, - tl_names, count, &dummy_flags, - &dummy_mask)) { - context->config.sockaddr_aux_rscs_bitmap |= UCS_BIT(tl_id); - } - } -} - -static void ucp_fill_sockaddr_tls_prio_list(ucp_context_h context, - const char **sockaddr_tl_names, - ucp_rsc_index_t num_sockaddr_tls) -{ - uint64_t sa_tls_bitmap = 0; - ucp_rsc_index_t idx = 0; - ucp_tl_resource_desc_t *resource; - ucp_rsc_index_t tl_id; - ucp_tl_md_t *tl_md; - ucp_rsc_index_t j; - - /* Set a bitmap of sockaddr transports */ - for (j = 0; j < context->num_tls; ++j) { - resource = &context->tl_rscs[j]; - tl_md = &context->tl_mds[resource->md_index]; - if (tl_md->attr.cap.flags & UCT_MD_FLAG_SOCKADDR) { - sa_tls_bitmap |= UCS_BIT(j); - } - } - - /* Parse the sockaddr transports priority list */ - for (j = 0; j < num_sockaddr_tls; j++) { - /* go over the priority list and find the transport's tl_id in the - * sockaddr tls bitmap. save the tl_id's for the client/server usage - * later */ - ucs_for_each_bit(tl_id, sa_tls_bitmap) { - resource = &context->tl_rscs[tl_id]; - - if (!strcmp(sockaddr_tl_names[j], "*") || - !strncmp(sockaddr_tl_names[j], resource->tl_rsc.tl_name, - UCT_TL_NAME_MAX)) { - context->config.sockaddr_tl_ids[idx] = tl_id; - idx++; - sa_tls_bitmap &= ~UCS_BIT(tl_id); - } - } - } - - context->config.num_sockaddr_tls = idx; -} - static void ucp_fill_sockaddr_cms_prio_list(ucp_context_h context, const char **sockaddr_cm_names, - ucp_rsc_index_t num_sockaddr_cms, - int sockaddr_cm_enable) + ucp_rsc_index_t num_sockaddr_cms) { uint64_t cm_cmpts_bitmap = context->config.cm_cmpts_bitmap; uint64_t cm_cmpts_bitmap_safe; @@ -1034,10 +963,6 @@ static void ucp_fill_sockaddr_cms_prio_list(ucp_context_h context, memset(&context->config.cm_cmpt_idxs, UCP_NULL_RESOURCE, UCP_MAX_RESOURCES); context->config.num_cm_cmpts = 0; - if (!sockaddr_cm_enable) { - return; - } - /* Parse the sockaddr CMs priority list */ for (cm_idx = 0; cm_idx < num_sockaddr_cms; ++cm_idx) { /* go over the priority list and find the CM's cm_idx in the @@ -1061,8 +986,6 @@ static ucs_status_t ucp_fill_sockaddr_prio_list(ucp_context_h context, { const char **sockaddr_tl_names = (const char**)config->sockaddr_cm_tls.cm_tls; unsigned num_sockaddr_tls = config->sockaddr_cm_tls.count; - int sockaddr_cm_enable = context->config.ext.sockaddr_cm_enable != - UCS_NO; /* Check if a list of sockaddr transports/CMs has valid length */ if (num_sockaddr_tls > UCP_MAX_RESOURCES) { @@ -1071,13 +994,10 @@ static ucs_status_t ucp_fill_sockaddr_prio_list(ucp_context_h context, num_sockaddr_tls = UCP_MAX_RESOURCES; } - ucp_fill_sockaddr_tls_prio_list(context, sockaddr_tl_names, - num_sockaddr_tls); ucp_fill_sockaddr_cms_prio_list(context, sockaddr_tl_names, - num_sockaddr_tls, sockaddr_cm_enable); - if ((context->config.ext.sockaddr_cm_enable == UCS_YES) && - (context->config.num_cm_cmpts == 0)) { - ucs_error("UCX_SOCKADDR_CM_ENABLE is set to yes but none of the available components supports SOCKADDR_CM"); + num_sockaddr_tls); + if (context->config.num_cm_cmpts == 0) { + ucs_diag("none of the available components supports sockaddr connection management"); return UCS_ERR_UNSUPPORTED; } @@ -1274,9 +1194,8 @@ static ucs_status_t ucp_fill_resources(ucp_context_h context, max_mds += context->tl_cmpts[i].attr.md_resource_count; } - if ((context->config.ext.sockaddr_cm_enable == UCS_YES) && - (context->config.cm_cmpts_bitmap == 0)) { - ucs_error("there are no UCT components with CM capability"); + if (context->config.cm_cmpts_bitmap == 0) { + ucs_debug("there are no UCT components with CM capability"); status = UCS_ERR_UNSUPPORTED; goto err_free_resources; } @@ -1339,7 +1258,6 @@ static ucs_status_t ucp_fill_resources(ucp_context_h context, goto err_free_resources; } - ucp_fill_sockaddr_aux_tls_config(context, config); status = ucp_fill_sockaddr_prio_list(context, config); if (status != UCS_OK) { goto err_free_resources; diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index e961fb1e5ba..4c0a2047f9f 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -98,8 +98,6 @@ typedef struct ucp_context_config { int flush_worker_eps; /** Enable optimizations suitable for homogeneous systems */ int unified_mode; - /** Enable cm wireup-and-close protocol for client-server connections */ - ucs_ternary_auto_value_t sockaddr_cm_enable; /** Enable cm wireup message exchange to select the best transports * for all lanes after cm phase is done */ int cm_use_all_devices; @@ -240,13 +238,6 @@ typedef struct ucp_context { /* Cached map of components which support CM capability */ uint64_t cm_cmpts_bitmap; - /* Bitmap of sockaddr auxiliary transports to pack for client/server flow */ - uint64_t sockaddr_aux_rscs_bitmap; - - /* Array of sockaddr transports indexes. - * The indexes appear in the configured priority order */ - ucp_rsc_index_t sockaddr_tl_ids[UCP_MAX_RESOURCES]; - ucp_rsc_index_t num_sockaddr_tls; /* Array of CMs indexes. The indexes appear in the configured priority * order. */ ucp_rsc_index_t cm_cmpt_idxs[UCP_MAX_RESOURCES]; diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 26b338ec8d2..18afecd4bbb 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -229,43 +229,6 @@ static void ucp_ep_delete(ucp_ep_h ep) ucp_ep_destroy_base(ep); } -ucs_status_t -ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags, - const ucp_unpacked_address_t *remote_address, - ucp_ep_h *ep_p) -{ - ucp_wireup_ep_t *wireup_ep; - ucs_status_t status; - ucp_ep_h ep; - - /* allocate endpoint */ - status = ucp_worker_create_ep(worker, ep_init_flags, remote_address->name, - "listener", &ep); - if (status != UCS_OK) { - goto err; - } - - status = ucp_ep_init_create_wireup(ep, ep_init_flags, &wireup_ep); - if (status != UCS_OK) { - goto err_delete; - } - - status = ucp_wireup_ep_connect_aux(wireup_ep, ep_init_flags, remote_address); - if (status != UCS_OK) { - goto err_destroy_wireup_ep; - } - - *ep_p = ep; - return status; - -err_destroy_wireup_ep: - uct_ep_destroy(ep->uct_eps[0]); -err_delete: - ucp_ep_delete(ep); -err: - return status; -} - void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key, unsigned ep_init_flags) { @@ -273,13 +236,6 @@ void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key, UCP_ERR_HANDLING_MODE_PEER : UCP_ERR_HANDLING_MODE_NONE; } -int ucp_ep_is_sockaddr_stub(ucp_ep_h ep) -{ - /* Only a sockaddr client-side endpoint may be created as a "stub" */ - return (ucp_ep_get_rsc_index(ep, 0) == UCP_NULL_RESOURCE) && - !ucp_ep_has_cm_lane(ep); -} - static ucs_status_t ucp_ep_adjust_params(ucp_ep_h ep, const ucp_ep_params_t *params) { @@ -390,6 +346,9 @@ ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags, ucp_ep_config_key_t key; ucs_status_t status; + ucs_assert(ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT); + ucs_assert(ucp_worker_num_cm_cmpts(ep->worker) != 0); + ucp_ep_config_key_reset(&key); ucp_ep_config_key_set_err_mode(&key, ep_init_flags); @@ -477,7 +436,8 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker, /* allocate endpoint */ ucs_sockaddr_str(params->sockaddr.addr, peer_name, sizeof(peer_name)); - ep_init_flags = ucp_ep_init_flags(worker, params); + ep_init_flags = ucp_ep_init_flags(worker, params) | + ucp_cm_ep_init_flags(params); status = ucp_worker_create_ep(worker, ep_init_flags, peer_name, "from api call", &ep); @@ -495,9 +455,7 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker, goto err_cleanup_lanes; } - status = ucp_worker_sockaddr_is_cm_proto(ep->worker) ? - ucp_ep_client_cm_connect_start(ep, params) : - ucp_wireup_ep_connect_to_sockaddr(ep->uct_eps[0], params); + status = ucp_ep_client_cm_connect_start(ep, params); if (status != UCS_OK) { goto err_cleanup_lanes; } @@ -544,77 +502,20 @@ ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker, return status; } - switch (sa_data->addr_mode) { - case UCP_WIREUP_SA_DATA_FULL_ADDR: - /* create endpoint to the worker address we got in the private data */ - status = ucp_ep_create_to_worker_addr(worker, UINT64_MAX, &remote_addr, - ep_init_flags | - UCP_EP_INIT_CREATE_AM_LANE, - "listener", ep_p); - if (status != UCS_OK) { - goto non_cm_err_reject; - } - - ucs_assert(ucp_ep_config(*ep_p)->key.err_mode == sa_data->err_mode); - ucp_ep_flush_state_reset(*ep_p); - ucp_ep_update_remote_id(*ep_p, sa_data->ep_id); - /* send wireup request message, to connect the client to the server's - new endpoint */ - ucs_assert(!((*ep_p)->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED)); - status = ucp_wireup_send_request(*ep_p); - if (status != UCS_OK) { - goto non_cm_err_destroy_ep; - } - break; - case UCP_WIREUP_SA_DATA_PARTIAL_ADDR: - status = ucp_ep_create_sockaddr_aux(worker, ep_init_flags, - &remote_addr, ep_p); - if (status != UCS_OK) { - goto non_cm_err_reject; - } - - ucp_ep_update_remote_id(*ep_p, sa_data->ep_id); - /* the server's ep should be aware of the sent address from the client */ - (*ep_p)->flags |= UCP_EP_FLAG_LISTENER; - /* NOTE: protect union */ - ucs_assert(!((*ep_p)->flags & (UCP_EP_FLAG_ON_MATCH_CTX | - UCP_EP_FLAG_FLUSH_STATE_VALID))); - status = ucp_wireup_send_pre_request(*ep_p); - if (status != UCS_OK) { - goto non_cm_err_destroy_ep; - } - break; - case UCP_WIREUP_SA_DATA_CM_ADDR: - ucs_assert(ucp_worker_sockaddr_is_cm_proto(worker)); - for (i = 0; i < remote_addr.address_count; ++i) { - remote_addr.address_list[i].dev_addr = conn_request->remote_dev_addr; - remote_addr.address_list[i].dev_index = conn_request->sa_data.dev_index; - } - status = ucp_ep_cm_server_create_connected(worker, ep_init_flags, - &remote_addr, conn_request, - ep_p); - ucs_free(remote_addr.address_list); - return status; - default: + if (sa_data->addr_mode != UCP_WIREUP_SA_DATA_CM_ADDR) { ucs_fatal("client sockaddr data contains invalid address mode %d", sa_data->addr_mode); } - /* common non-CM flow */ - status = uct_iface_accept(conn_request->uct.iface, - conn_request->uct_req); - goto non_cm_out; + for (i = 0; i < remote_addr.address_count; ++i) { + remote_addr.address_list[i].dev_addr = conn_request->remote_dev_addr; + remote_addr.address_list[i].dev_index = conn_request->sa_data.dev_index; + } -non_cm_err_destroy_ep: - ucp_ep_destroy_internal(*ep_p); -non_cm_err_reject: - ucs_error("connection request failed on listener %p with status %s", - conn_request->listener, ucs_status_string(status)); - uct_iface_reject(conn_request->uct.iface, conn_request->uct_req); -non_cm_out: - ucs_free(conn_request); + status = ucp_ep_cm_server_create_connected(worker, ep_init_flags, + &remote_addr, conn_request, + ep_p); ucs_free(remote_addr.address_list); - ucs_assert(!ucp_worker_sockaddr_is_cm_proto(worker)); return status; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index dc96334adbd..6508a82b7f5 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -52,9 +52,6 @@ enum { UCP_EP_FLAG_STREAM_HAS_DATA = UCS_BIT(5), /* EP has data in the ext.stream.match_q */ UCP_EP_FLAG_ON_MATCH_CTX = UCS_BIT(6), /* EP is on match queue */ UCP_EP_FLAG_REMOTE_ID = UCS_BIT(7), /* remote ID is valid */ - UCP_EP_FLAG_LISTENER = UCS_BIT(8), /* EP holds pointer to a listener - (on server side due to receiving partial - worker address from the client) */ UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED = UCS_BIT(9), /* Pre-Connection request was queued */ UCP_EP_FLAG_CLOSED = UCS_BIT(10),/* EP was closed */ UCP_EP_FLAG_CLOSE_REQ_VALID = UCS_BIT(11),/* close protocol is started and @@ -397,10 +394,7 @@ typedef struct { ucs_ptr_map_key_t local_ep_id; /* Local EP ID */ ucs_ptr_map_key_t remote_ep_id; /* Remote EP ID */ ucp_err_handler_cb_t err_cb; /* Error handler */ - union { - ucp_listener_h listener; /* Listener that may be associated with ep */ - ucp_ep_close_proto_req_t close_req; /* Close protocol request */ - }; + ucp_ep_close_proto_req_t close_req; /* Close protocol request */ } ucp_ep_ext_control_t; @@ -441,12 +435,7 @@ typedef struct { enum { - UCP_WIREUP_SA_DATA_FULL_ADDR = 0, /* Sockaddr client data contains full - address. */ - UCP_WIREUP_SA_DATA_PARTIAL_ADDR, /* Sockaddr client data contains partial - address, wireup protocol requires - extra MSGs. */ - UCP_WIREUP_SA_DATA_CM_ADDR /* Sockaddr client data contains address + UCP_WIREUP_SA_DATA_CM_ADDR = 2 /* Sockaddr client data contains address for CM based wireup: there is only iface and ep address of transport lanes, remote device address is @@ -471,15 +460,13 @@ struct ucp_wireup_sockaddr_data { typedef struct ucp_conn_request { ucp_listener_h listener; - union { - uct_listener_h listener; - uct_iface_h iface; - } uct; + uct_listener_h uct_listener; uct_conn_request_h uct_req; ucp_rsc_index_t cm_idx; char dev_name[UCT_DEVICE_NAME_MAX]; uct_device_addr_t *remote_dev_addr; struct sockaddr_storage client_address; + ucp_ep_h ep; /* valid only if request is handled internally */ ucp_wireup_sockaddr_data_t sa_data; /* packed worker address follows */ } ucp_conn_request_t; @@ -544,8 +531,6 @@ void ucp_ep_destroy_internal(ucp_ep_h ep); void ucp_ep_cleanup_lanes(ucp_ep_h ep); -int ucp_ep_is_sockaddr_stub(ucp_ep_h ep); - ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, const ucp_ep_config_key_t *key); diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index d645ef64e80..e6ada14993f 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -145,7 +145,6 @@ static UCS_F_ALWAYS_INLINE ucp_ep_flush_state_t* ucp_ep_flush_state(ucp_ep_h ep) { ucs_assert(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID); ucs_assert(!(ep->flags & UCP_EP_FLAG_ON_MATCH_CTX)); - ucs_assert(!(ep->flags & UCP_EP_FLAG_LISTENER)); ucs_assert(!(ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID)); return &ucp_ep_ext_gen(ep)->flush_state; } @@ -215,8 +214,7 @@ static inline void ucp_ep_flush_state_reset(ucp_ep_h ep) { ucp_ep_flush_state_t *flush_state = &ucp_ep_ext_gen(ep)->flush_state; - ucs_assert(!(ep->flags & (UCP_EP_FLAG_ON_MATCH_CTX | - UCP_EP_FLAG_LISTENER))); + ucs_assert(!(ep->flags & UCP_EP_FLAG_ON_MATCH_CTX)); ucs_assert(!(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID) || ((flush_state->send_sn == 0) && (flush_state->cmpl_sn == 0) && diff --git a/src/ucp/core/ucp_listener.c b/src/ucp/core/ucp_listener.c index 28ae19425ad..fc9514cad09 100644 --- a/src/ucp/core/ucp_listener.c +++ b/src/ucp/core/ucp_listener.c @@ -22,128 +22,38 @@ static unsigned ucp_listener_accept_cb_progress(void *arg) { - ucp_ep_h ep = arg; - ucp_listener_h listener = ucp_ep_ext_control(ep)->listener; + ucp_conn_request_h conn_request = arg; + ucp_listener_h listener = conn_request->listener; + ucp_ep_h ep = conn_request->ep; - /* NOTE: protect union */ - ucs_assert(!(ep->flags & (UCP_EP_FLAG_ON_MATCH_CTX | - UCP_EP_FLAG_FLUSH_STATE_VALID))); - ucs_assert(ep->flags & UCP_EP_FLAG_LISTENER); + ucs_free(conn_request->remote_dev_addr); + ucs_free(conn_request); - ep->flags &= ~UCP_EP_FLAG_LISTENER; ep->flags |= UCP_EP_FLAG_USED; ucp_stream_ep_activate(ep); ucp_ep_flush_state_reset(ep); - /* - * listener is NULL if the EP was created with UCP_EP_PARAM_FIELD_EP_ADDR - * and we are here because long address requires wireup protocol - */ - if (listener && listener->accept_cb) { - listener->accept_cb(ep, listener->arg); - } - + listener->accept_cb(ep, listener->arg); return 1; } int ucp_listener_accept_cb_remove_filter(const ucs_callbackq_elem_t *elem, - void *arg) + void *arg) { - ucp_ep_h ep = elem->arg; + ucp_conn_request_h conn_request = elem->arg; - return (elem->cb == ucp_listener_accept_cb_progress) && (ep == arg); + return (elem->cb == ucp_listener_accept_cb_progress) && + (conn_request->ep == arg); } -void ucp_listener_schedule_accept_cb(ucp_ep_h ep) +void ucp_listener_schedule_accept_cb(ucp_conn_request_h conn_request) { uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL; - uct_worker_progress_register_safe(ep->worker->uct, + uct_worker_progress_register_safe(conn_request->ep->worker->uct, ucp_listener_accept_cb_progress, - ep, UCS_CALLBACKQ_FLAG_ONESHOT, - &prog_id); -} - -static unsigned ucp_listener_conn_request_progress(void *arg) -{ - ucp_conn_request_h conn_request = arg; - ucp_listener_h listener = conn_request->listener; - ucp_worker_h worker = listener->worker; - ucp_ep_h ep; - ucs_status_t status; - - ucs_trace_func("listener=%p", listener); - - if (listener->conn_cb) { - listener->conn_cb(conn_request, listener->arg); - return 1; - } - - UCS_ASYNC_BLOCK(&worker->async); - status = ucp_ep_create_server_accept(worker, conn_request, &ep); - if (status != UCS_OK) { - goto out; - } - - if (listener->accept_cb != NULL) { - if (ep->flags & UCP_EP_FLAG_LISTENER) { - ucs_assert(!(ep->flags & UCP_EP_FLAG_USED)); - ucp_ep_ext_control(ep)->listener = listener; - } else { - ep->flags |= UCP_EP_FLAG_USED; - listener->accept_cb(ep, listener->arg); - } - } - -out: - UCS_ASYNC_UNBLOCK(&worker->async); - return 1; -} - -static int ucp_listener_remove_filter(const ucs_callbackq_elem_t *elem, - void *arg) -{ - ucp_listener_h *listener = elem->arg; - - return (elem->cb == ucp_listener_conn_request_progress) && (listener == arg); -} - -static void ucp_listener_conn_request_callback(uct_iface_h tl_iface, void *arg, - uct_conn_request_h uct_req, - const void *conn_priv_data, - size_t length) -{ - ucp_listener_h listener = arg; - uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL; - ucp_conn_request_h conn_request; - - ucs_trace("listener %p: got connection request", listener); - - /* Defer wireup init and user's callback to be invoked from the main thread */ - conn_request = ucs_malloc(ucs_offsetof(ucp_conn_request_t, sa_data) + - length, "accept connection request"); - if (conn_request == NULL) { - ucs_error("failed to allocate connect request, " - "rejecting connection request %p on TL iface %p, reason %s", - uct_req, tl_iface, ucs_status_string(UCS_ERR_NO_MEMORY)); - uct_iface_reject(tl_iface, uct_req); - return; - } - - conn_request->listener = listener; - conn_request->uct_req = uct_req; - conn_request->uct.iface = tl_iface; - memset(&conn_request->client_address, 0, sizeof(struct sockaddr_storage)); - memcpy(&conn_request->sa_data, conn_priv_data, length); - - uct_worker_progress_register_safe(listener->worker->uct, - ucp_listener_conn_request_progress, - conn_request, UCS_CALLBACKQ_FLAG_ONESHOT, - &prog_id); - - /* If the worker supports the UCP_FEATURE_WAKEUP feature, signal the user so - * that he can wake-up on this event */ - ucp_worker_signal_internal(listener->worker); + conn_request, + UCS_CALLBACKQ_FLAG_ONESHOT, &prog_id); } ucs_status_t ucp_conn_request_query(ucp_conn_request_h conn_request, @@ -186,8 +96,6 @@ static void ucp_listener_close_uct_listeners(ucp_listener_h listener) { ucp_rsc_index_t i; - ucs_assert_always(ucp_worker_sockaddr_is_cm_proto(listener->worker)); - for (i = 0; i < listener->num_rscs; ++i) { uct_listener_destroy(listener->listeners[i]); } @@ -198,27 +106,8 @@ static void ucp_listener_close_uct_listeners(ucp_listener_h listener) listener->num_rscs = 0; } -static void ucp_listener_close_ifaces(ucp_listener_h listener) -{ - ucp_worker_h worker; - int i; - - ucs_assert_always(!ucp_worker_sockaddr_is_cm_proto(listener->worker)); - - for (i = 0; i < listener->num_rscs; i++) { - worker = listener->wifaces[i]->worker; - ucs_assert_always(worker == listener->worker); - /* remove pending slow-path progress in case it wasn't removed yet */ - ucs_callbackq_remove_if(&worker->uct->progress_q, - ucp_listener_remove_filter, listener); - ucp_worker_iface_cleanup(listener->wifaces[i]); - } - - ucs_free(listener->wifaces); -} - static ucs_status_t -ucp_listen_on_cm(ucp_listener_h listener, const ucp_listener_params_t *params) +ucp_listen(ucp_listener_h listener, const ucp_listener_params_t *params) { ucp_worker_h worker = listener->worker; const ucp_rsc_index_t num_cms = ucp_worker_num_cm_cmpts(worker); @@ -327,137 +216,6 @@ ucp_listen_on_cm(ucp_listener_h listener, const ucp_listener_params_t *params) return status; } -static ucs_status_t -ucp_listen_on_iface(ucp_listener_h listener, - const ucp_listener_params_t *params) -{ - ucp_worker_h worker = listener->worker; - ucp_context_h context = listener->worker->context; - int sockaddr_tls = 0; - char saddr_str[UCS_SOCKADDR_STRING_LEN]; - ucp_tl_resource_desc_t *resource; - uct_iface_params_t iface_params; - struct sockaddr_storage *listen_sock; - ucp_worker_iface_t **tmp; - ucp_rsc_index_t tl_id; - ucs_status_t status; - ucp_tl_md_t *tl_md; - uint16_t port; - int i; - - status = ucs_sockaddr_get_port(params->sockaddr.addr, &port); - if (status != UCS_OK) { - return status; - } - - /* Go through all the available resources and for each one, check if the given - * sockaddr is accessible from its md. Start listening on all the mds that - * satisfy this. - * If the given port is set to 0, i.e. use a random port, the first transport - * in the sockaddr priority list from the environment configuration will - * dictate the port to listen on for the other sockaddr transports in the list. - * */ - for (i = 0; i < context->config.num_sockaddr_tls; i++) { - tl_id = context->config.sockaddr_tl_ids[i]; - resource = &context->tl_rscs[tl_id]; - tl_md = &context->tl_mds[resource->md_index]; - - if (!uct_md_is_sockaddr_accessible(tl_md->md, ¶ms->sockaddr, - UCT_SOCKADDR_ACC_LOCAL)) { - continue; - } - - tmp = ucs_realloc(listener->wifaces, - sizeof(*tmp) * (sockaddr_tls + 1), - "listener wifaces"); - if (tmp == NULL) { - ucs_error("failed to allocate listener wifaces"); - status = UCS_ERR_NO_MEMORY; - goto err_close_listener_wifaces; - } - - listener->wifaces = tmp; - - iface_params.field_mask = UCT_IFACE_PARAM_FIELD_OPEN_MODE | - UCT_IFACE_PARAM_FIELD_SOCKADDR; - iface_params.open_mode = UCT_IFACE_OPEN_MODE_SOCKADDR_SERVER; - iface_params.mode.sockaddr.conn_request_cb = ucp_listener_conn_request_callback; - iface_params.mode.sockaddr.conn_request_arg = listener; - iface_params.mode.sockaddr.listen_sockaddr = params->sockaddr; - iface_params.mode.sockaddr.cb_flags = UCT_CB_FLAG_ASYNC; - - if (port) { - /* Set the port for the next sockaddr iface. This port was either - * obtained from the user or generated by the first created sockaddr - * iface if the port from the user was equal to zero */ - status = ucs_sockaddr_set_port( - (struct sockaddr *) - iface_params.mode.sockaddr.listen_sockaddr.addr, port); - if (status != UCS_OK) { - ucs_error("failed to set port parameter (%d) for creating %s iface", - port, resource->tl_rsc.tl_name); - goto err_close_listener_wifaces; - } - } - - status = ucp_worker_iface_open(worker, tl_id, &iface_params, - &listener->wifaces[sockaddr_tls]); - if (status != UCS_OK) { - ucs_error("failed to open listener on %s on md %s", - ucs_sockaddr_str( - iface_params.mode.sockaddr.listen_sockaddr.addr, - saddr_str, sizeof(saddr_str)), - tl_md->rsc.md_name); - goto err_close_listener_wifaces; - } - - status = ucp_worker_iface_init(worker, tl_id, - listener->wifaces[sockaddr_tls]); - if ((status != UCS_OK) || - ((context->config.features & UCP_FEATURE_WAKEUP) && - !(listener->wifaces[sockaddr_tls]->attr.cap.flags & - UCT_IFACE_FLAG_CB_ASYNC))) { - ucp_worker_iface_cleanup(listener->wifaces[sockaddr_tls]); - goto err_close_listener_wifaces; - } - - listen_sock = &listener->wifaces[sockaddr_tls]->attr.listen_sockaddr; - status = ucs_sockaddr_get_port((struct sockaddr *)listen_sock, &port); - if (status != UCS_OK) { - goto err_close_listener_wifaces; - } - - sockaddr_tls++; - listener->num_rscs = sockaddr_tls; - ucs_trace("listener %p: accepting connections on %s on %s", - listener, tl_md->rsc.md_name, - ucs_sockaddr_str(iface_params.mode.sockaddr.listen_sockaddr.addr, - saddr_str, sizeof(saddr_str))); - } - - if (!sockaddr_tls) { - ucs_error("none of the available transports can listen for connections on %s", - ucs_sockaddr_str(params->sockaddr.addr, saddr_str, - sizeof(saddr_str))); - listener->num_rscs = 0; - status = UCS_ERR_UNREACHABLE; - goto err_close_listener_wifaces; - } - - listen_sock = &listener->wifaces[sockaddr_tls - 1]->attr.listen_sockaddr; - status = ucs_sockaddr_copy((struct sockaddr *)&listener->sockaddr, - (struct sockaddr *)listen_sock); - if (status != UCS_OK) { - goto err_close_listener_wifaces; - } - - return UCS_OK; - -err_close_listener_wifaces: - ucp_listener_close_ifaces(listener); - return status; -} - ucs_status_t ucp_listener_create(ucp_worker_h worker, const ucp_listener_params_t *params, ucp_listener_h *listener_p) @@ -479,6 +237,11 @@ ucs_status_t ucp_listener_create(ucp_worker_h worker, return UCS_ERR_INVALID_PARAM; } + if (ucp_worker_num_cm_cmpts(worker) == 0) { + ucs_error("cannot create listener: none of the available components supports it"); + return UCS_ERR_UNSUPPORTED; + } + listener = ucs_calloc(1, sizeof(*listener), "ucp_listener"); if (listener == NULL) { ucs_error("cannot allocate memory for UCP listener"); @@ -501,12 +264,7 @@ ucs_status_t ucp_listener_create(ucp_worker_h worker, listener->arg = params->conn_handler.arg; } - if (ucp_worker_sockaddr_is_cm_proto(worker)) { - status = ucp_listen_on_cm(listener, params); - } else { - status = ucp_listen_on_iface(listener, params); - } - + status = ucp_listen(listener, params); if (status == UCS_OK) { *listener_p = listener; goto out; @@ -529,12 +287,7 @@ void ucp_listener_destroy(ucp_listener_h listener) listener); UCS_ASYNC_UNBLOCK(&listener->worker->async); - if (ucp_worker_sockaddr_is_cm_proto(listener->worker)) { - ucp_listener_close_uct_listeners(listener); - } else { - ucp_listener_close_ifaces(listener); - } - + ucp_listener_close_uct_listeners(listener); ucs_free(listener); } @@ -546,14 +299,8 @@ ucs_status_t ucp_listener_reject(ucp_listener_h listener, ucs_trace("listener %p: free conn_request %p", listener, conn_request); UCS_ASYNC_BLOCK(&worker->async); - - if (ucp_worker_sockaddr_is_cm_proto(worker)) { - uct_listener_reject(conn_request->uct.listener, conn_request->uct_req); - ucs_free(conn_request->remote_dev_addr); - } else { - uct_iface_reject(conn_request->uct.iface, conn_request->uct_req); - } - + uct_listener_reject(conn_request->uct_listener, conn_request->uct_req); + ucs_free(conn_request->remote_dev_addr); UCS_ASYNC_UNBLOCK(&worker->async); ucs_free(conn_request); diff --git a/src/ucp/core/ucp_listener.h b/src/ucp/core/ucp_listener.h index 5385a2e93dc..c4f9b35469f 100644 --- a/src/ucp/core/ucp_listener.h +++ b/src/ucp/core/ucp_listener.h @@ -40,7 +40,7 @@ typedef struct ucp_listener { } ucp_listener_t; -void ucp_listener_schedule_accept_cb(ucp_ep_h ep); +void ucp_listener_schedule_accept_cb(ucp_conn_request_h conn_request); int ucp_listener_accept_cb_remove_filter(const ucs_callbackq_elem_t *elem, void *arg); diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index dc1145b3fe4..79c40d959c7 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -1402,7 +1402,7 @@ static ucs_status_t ucp_worker_add_resource_cms(ucp_worker_h worker) ucp_rsc_index_t cmpt_index, cm_cmpt_index, i; ucs_status_t status; - if (!ucp_worker_sockaddr_is_cm_proto(worker)) { + if (ucp_worker_num_cm_cmpts(worker) == 0) { worker->cms = NULL; return UCS_OK; } diff --git a/src/ucp/core/ucp_worker.inl b/src/ucp/core/ucp_worker.inl index e804d524b32..a7ef6d269e6 100644 --- a/src/ucp/core/ucp_worker.inl +++ b/src/ucp/core/ucp_worker.inl @@ -183,15 +183,6 @@ ucp_worker_num_cm_cmpts(const ucp_worker_h worker) return worker->context->config.num_cm_cmpts; } -/** - * @return whether the worker should be using connection manager mode - */ -static UCS_F_ALWAYS_INLINE int -ucp_worker_sockaddr_is_cm_proto(const ucp_worker_h worker) -{ - return !!ucp_worker_num_cm_cmpts(worker); -} - /** * Check if interface with @a iface_attr supports point-to-point connections. * diff --git a/src/ucp/wireup/ep_match.c b/src/ucp/wireup/ep_match.c index 8b01b76abbe..cc6fe8a4195 100644 --- a/src/ucp/wireup/ep_match.c +++ b/src/ucp/wireup/ep_match.c @@ -75,8 +75,7 @@ void ucp_ep_match_insert(ucp_worker_h worker, ucp_ep_h ep, uint64_t dest_uuid, !(ep->flags & UCP_EP_FLAG_REMOTE_ID)); /* NOTE: protect union */ ucs_assert(!(ep->flags & (UCP_EP_FLAG_ON_MATCH_CTX | - UCP_EP_FLAG_FLUSH_STATE_VALID | - UCP_EP_FLAG_LISTENER))); + UCP_EP_FLAG_FLUSH_STATE_VALID))); /* EP matching is not used in CM flow */ ucs_assert(!ucp_ep_has_cm_lane(ep)); ep->flags |= UCP_EP_FLAG_ON_MATCH_CTX; diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 89d9268c8a2..7fc81c2fbed 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -1762,45 +1762,3 @@ ucp_wireup_select_aux_transport(ucp_ep_h ep, unsigned ep_init_flags, UINT64_MAX, UINT64_MAX, UINT64_MAX, UINT64_MAX, 1, select_info); } - -ucs_status_t -ucp_wireup_select_sockaddr_transport(const ucp_context_h context, - const ucs_sock_addr_t *sockaddr, - ucp_rsc_index_t *rsc_index_p) -{ - char saddr_str[UCS_SOCKADDR_STRING_LEN]; - ucp_tl_resource_desc_t *resource; - ucp_rsc_index_t tl_id; - ucp_md_index_t md_index; - uct_md_h md; - int i; - - /* Go over the sockaddr transports priority array and try to use the transports - * one by one for the client side */ - for (i = 0; i < context->config.num_sockaddr_tls; i++) { - tl_id = context->config.sockaddr_tl_ids[i]; - resource = &context->tl_rscs[tl_id]; - md_index = resource->md_index; - md = context->tl_mds[md_index].md; - - ucs_assert(context->tl_mds[md_index].attr.cap.flags & - UCT_MD_FLAG_SOCKADDR); - - /* The client selects the transport for sockaddr according to the - * configuration. We rely on the server having this transport available - * as well */ - if (uct_md_is_sockaddr_accessible(md, sockaddr, - UCT_SOCKADDR_ACC_REMOTE)) { - *rsc_index_p = tl_id; - ucs_debug("sockaddr transport selected: %s", resource->tl_rsc.tl_name); - return UCS_OK; - } - - ucs_debug("md %s cannot reach %s", - context->tl_mds[md_index].rsc.md_name, - ucs_sockaddr_str(sockaddr->addr, saddr_str, - sizeof(saddr_str))); - } - - return UCS_ERR_UNREACHABLE; -} diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 8bfeecf926e..6e127eda084 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -137,11 +137,6 @@ ucs_status_t ucp_wireup_msg_progress(uct_pending_req_t *self) return UCS_OK; } -static inline int ucp_wireup_is_ep_needed(ucp_ep_h ep) -{ - return (ep != NULL) && !(ep->flags & UCP_EP_FLAG_LISTENER); -} - /* * @param [in] rsc_tli Resource index for every lane. */ @@ -180,10 +175,9 @@ ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type, uint64_t tl_bitmap, ucp_request_send_state_init(req, ucp_dt_make_contig(1), 0); /* pack all addresses */ - status = ucp_address_pack(ep->worker, - ucp_wireup_is_ep_needed(ep) ? ep : NULL, - tl_bitmap, UCP_ADDRESS_PACK_FLAGS_ALL, - lanes2remote, &req->send.length, &address); + status = ucp_address_pack(ep->worker, ep, tl_bitmap, + UCP_ADDRESS_PACK_FLAGS_ALL, lanes2remote, + &req->send.length, &address); if (status != UCS_OK) { ucs_free(req); ucs_error("failed to pack address: %s", ucs_status_string(status)); @@ -433,7 +427,6 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, ucp_rsc_index_t lanes2remote[UCP_MAX_LANES]; unsigned addr_indices[UCP_MAX_LANES]; ucs_status_t status; - ucp_ep_flags_t listener_flag; ucp_ep_h ep; int has_cm_lane; @@ -446,12 +439,7 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, /* wireup request for a specific ep */ ep = ucp_worker_get_ep_by_id(worker, msg->dst_ep_id); ucp_ep_update_remote_id(ep, msg->src_ep_id); - if (!(ep->flags & UCP_EP_FLAG_LISTENER)) { - /* Reset flush state only if it's not a client-server wireup on - * server side with long address exchange when listener (united with - * flush state) should be valid until user's callback invoking */ - ucp_ep_flush_state_reset(ep); - } + ucp_ep_flush_state_reset(ep); ep_init_flags |= UCP_EP_INIT_CREATE_AM_LANE; } else { ep = ucp_ep_match_retrieve(worker, remote_uuid, @@ -494,15 +482,6 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, has_cm_lane = ucp_ep_has_cm_lane(ep); - if (ep->flags & UCP_EP_FLAG_LISTENER) { - /* If this is an ep on a listener (server) that received a partial - * worker address from the client, then the following lanes initialization - * will be done after an aux lane was already created on this ep. - * Therefore, remove the existing aux endpoint since will need to create - * new lanes now */ - ucp_ep_cleanup_lanes(ep); - } - if (msg->err_mode == UCP_ERR_HANDLING_MODE_PEER) { ep_init_flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE; } @@ -556,27 +535,8 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, } if (send_reply) { - listener_flag = ep->flags & UCP_EP_FLAG_LISTENER; - /* Remove this flag at this point if it's set - * (so that address packing would be correct) */ - ep->flags &= ~UCP_EP_FLAG_LISTENER; - ucs_trace("ep %p: sending wireup reply", ep); - status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_REPLY, tl_bitmap, - lanes2remote); - if (status != UCS_OK) { - return; - } - - /* Restore saved flag value */ - ep->flags |= listener_flag; - } else { - /* if in client-server flow, schedule invoking the user's callback - * (if server is connected) from the main thread */ - if (ucs_test_all_flags(ep->flags, - (UCP_EP_FLAG_LISTENER | UCP_EP_FLAG_LOCAL_CONNECTED))) { - ucp_listener_schedule_accept_cb(ep); - } + ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_REPLY, tl_bitmap, lanes2remote); } } @@ -611,7 +571,6 @@ ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg, ep = ucp_worker_get_ep_by_id(worker, msg->dst_ep_id); ucs_assert(msg->type == UCP_WIREUP_MSG_REPLY); - ucs_assert((!(ep->flags & UCP_EP_FLAG_LISTENER))); ucs_trace("ep %p: got wireup reply src_ep_id 0x%"PRIx64 " dst_ep_id 0x%"PRIx64" sn %d", ep, msg->src_ep_id, msg->dst_ep_id, msg->conn_sn); @@ -673,13 +632,6 @@ void ucp_wireup_process_ack(ucp_worker_h worker, const ucp_wireup_msg_t *msg) } ucp_wireup_remote_connected(ep); - - /* if this ack is received as part of the client-server flow, when handling - * a large worker address from the client, invoke the cached user callback - * from the main thread */ - if (ep->flags & UCP_EP_FLAG_LISTENER) { - ucp_listener_schedule_accept_cb(ep); - } } static ucs_status_t ucp_wireup_msg_handler(void *arg, void *data, @@ -1209,8 +1161,8 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, } if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) && - /* reconfiguration is allowed for CM and sockaddr flows */ - !ucp_ep_is_sockaddr_stub(ep) && !ucp_ep_has_cm_lane(ep)) { + /* reconfiguration is allowed for CM flow */ + !ucp_ep_has_cm_lane(ep)) { /* * TODO handle a case where we have to change lanes and reconfigure the ep: * @@ -1304,8 +1256,7 @@ ucs_status_t ucp_wireup_send_pre_request(ucp_ep_h ep) uint64_t tl_bitmap = UINT64_MAX; /* pack full worker address */ ucs_status_t status; - ucs_assert((ep->flags & UCP_EP_FLAG_LISTENER) || - ucp_ep_has_cm_lane(ep)); + ucs_assert(ucp_ep_has_cm_lane(ep)); ucs_assert(!(ep->flags & UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED)); ucs_debug("ep %p: send wireup pre-request (flags=0x%x)", ep, ep->flags); @@ -1470,7 +1421,7 @@ ucp_ep_params_err_handling_mode(const ucp_ep_params_t *params) unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params) { - unsigned flags = ucp_cm_ep_init_flags(worker, params); + unsigned flags = ucp_cm_ep_init_flags(params); if (ucp_ep_init_flags_has_cm(flags) && worker->context->config.ext.cm_use_all_devices) { diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index 73322cdf050..cd99d8272c5 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -100,11 +100,6 @@ ucp_wireup_select_aux_transport(ucp_ep_h ep, unsigned ep_init_flags, const ucp_unpacked_address_t *remote_address, ucp_wireup_select_info_t *select_info); -ucs_status_t -ucp_wireup_select_sockaddr_transport(const ucp_context_h context, - const ucs_sock_addr_t *sockaddr, - ucp_rsc_index_t *rsc_index_p); - double ucp_wireup_amo_score_func(ucp_context_h context, const uct_md_attr_t *md_attr, const uct_iface_attr_t *iface_attr, diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index ff87913d590..b240cc20f8f 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -18,12 +18,8 @@ unsigned -ucp_cm_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params) +ucp_cm_ep_init_flags(const ucp_ep_params_t *params) { - if (!ucp_worker_sockaddr_is_cm_proto(worker)) { - return 0; - } - if (params->field_mask & UCP_EP_PARAM_FIELD_SOCK_ADDR) { return UCP_EP_INIT_CM_WIREUP_CLIENT | UCP_EP_INIT_CM_PHASE; } @@ -809,6 +805,7 @@ static unsigned ucp_cm_server_conn_request_progress(void *arg) return 1; } + ucs_assert(listener->accept_cb != NULL); UCS_ASYNC_BLOCK(&worker->async); ucp_ep_create_server_accept(worker, conn_request, &ep); UCS_ASYNC_UNBLOCK(&worker->async); @@ -908,9 +905,10 @@ void ucp_cm_server_conn_request_cb(uct_listener_h listener, void *arg, } ucp_conn_request->listener = ucp_listener; - ucp_conn_request->uct.listener = listener; + ucp_conn_request->uct_listener = listener; ucp_conn_request->uct_req = conn_request; ucp_conn_request->cm_idx = cm_idx; + ucp_conn_request->ep = NULL; status = ucs_sockaddr_copy((struct sockaddr *)&ucp_conn_request->client_address, conn_req_args->client_address.addr); @@ -970,7 +968,7 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, client_addr_str, sizeof(client_addr_str)), conn_request->dev_name); status = UCS_ERR_UNREACHABLE; - goto out; + goto out_free_request; } /* Create and connect TL part */ @@ -981,8 +979,8 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, ucs_warn("failed to create server ep and connect to worker address on " "device %s, tl_bitmap 0x%"PRIx64", status %s", conn_request->dev_name, tl_bitmap, ucs_status_string(status)); - uct_listener_reject(conn_request->uct.listener, conn_request->uct_req); - goto out; + uct_listener_reject(conn_request->uct_listener, conn_request->uct_req); + goto out_free_request; } status = ucp_wireup_connect_local(ep, remote_addr, NULL); @@ -991,11 +989,11 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, "device %s, tl_bitmap 0x%"PRIx64", status %s", ep, conn_request->dev_name, tl_bitmap, ucs_status_string(status)); - uct_listener_reject(conn_request->uct.listener, conn_request->uct_req); + uct_listener_reject(conn_request->uct_listener, conn_request->uct_req); goto err_destroy_ep; } - status = ucp_ep_cm_connect_server_lane(ep, conn_request->uct.listener, + status = ucp_ep_cm_connect_server_lane(ep, conn_request->uct_listener, conn_request->uct_req, conn_request->cm_idx); if (status != UCS_OK) { @@ -1006,21 +1004,26 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, goto err_destroy_ep; } - ep->flags |= UCP_EP_FLAG_LISTENER; - ucp_ep_ext_control(ep)->listener = conn_request->listener; ucp_ep_update_remote_id(ep, conn_request->sa_data.ep_id); - ucp_listener_schedule_accept_cb(ep); - *ep_p = ep; + if (conn_request->listener->accept_cb == NULL) { + goto out_free_request; + } else { + conn_request->ep = ep; + ucp_listener_schedule_accept_cb(conn_request); + goto out; + } -out: +err_destroy_ep: + ucp_ep_destroy_internal(ep); +out_free_request: ucs_free(conn_request->remote_dev_addr); ucs_free(conn_request); +out: + if (status == UCS_OK) { + *ep_p = ep; + } return status; - -err_destroy_ep: - ucp_ep_destroy_internal(ep); - goto out; } static ssize_t ucp_cm_server_priv_pack_cb(void *arg, diff --git a/src/ucp/wireup/wireup_cm.h b/src/ucp/wireup/wireup_cm.h index 14f5ac0f6f6..05c9d7020cc 100644 --- a/src/ucp/wireup/wireup_cm.h +++ b/src/ucp/wireup/wireup_cm.h @@ -19,8 +19,7 @@ typedef struct ucp_cm_client_connect_progress_arg { } ucp_cm_client_connect_progress_arg_t; -unsigned ucp_cm_ep_init_flags(const ucp_worker_h worker, - const ucp_ep_params_t *params); +unsigned ucp_cm_ep_init_flags(const ucp_ep_params_t *params); int ucp_ep_init_flags_has_cm(unsigned ep_init_flags); diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index af66930039f..de3b7f29879 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -519,190 +519,6 @@ ucs_status_t ucp_wireup_ep_connect(uct_ep_h uct_ep, unsigned ep_init_flags, return status; } -static ucs_status_t ucp_wireup_ep_pack_sockaddr_aux_tls(ucp_worker_h worker, - const char *dev_name, - uint64_t *tl_bitmap_p, - ucp_address_t **address_p, - size_t *address_length_p) -{ - ucp_context_h context = worker->context; - int tl_id, found_supported_tl = 0; - ucs_status_t status; - uint64_t tl_bitmap = 0; - - /* Find a transport which matches the given dev_name and the user's configuration. - * It also has to be a UCT_IFACE_FLAG_CONNECT_TO_IFACE transport and support - * active messaging for sending a wireup message */ - ucs_for_each_bit(tl_id, context->config.sockaddr_aux_rscs_bitmap) { - if ((!strncmp(context->tl_rscs[tl_id].tl_rsc.dev_name, dev_name, - UCT_DEVICE_NAME_MAX)) && - (ucs_test_all_flags(ucp_worker_iface_get_attr(worker, tl_id)->cap.flags, - UCT_IFACE_FLAG_CONNECT_TO_IFACE | - UCT_IFACE_FLAG_AM_BCOPY))) { - found_supported_tl = 1; - tl_bitmap |= UCS_BIT(tl_id); - } - } - - if (found_supported_tl) { - status = ucp_address_pack(worker, NULL, tl_bitmap, - UCP_ADDRESS_PACK_FLAGS_ALL, NULL, - address_length_p, (void**)address_p); - } else { - ucs_error("no supported sockaddr auxiliary transports found for %s", dev_name); - status = UCS_ERR_UNREACHABLE; - } - - *tl_bitmap_p = tl_bitmap; - return status; -} - -ssize_t ucp_wireup_ep_sockaddr_fill_private_data(void *arg, - const uct_cm_ep_priv_data_pack_args_t - *pack_args, void *priv_data) -{ - ucp_wireup_sockaddr_data_t *sa_data = priv_data; - ucp_wireup_ep_t *wireup_ep = arg; - ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; - ucp_rsc_index_t sockaddr_rsc = wireup_ep->sockaddr_rsc_index; - ucp_worker_h worker = ucp_ep->worker; - ucp_context_h context = worker->context; - size_t address_length, conn_priv_len; - ucp_address_t *worker_address, *rsc_address; - uct_iface_attr_t *attrs; - ucs_status_t status; - uint64_t tl_bitmap; - char aux_tls_str[64]; - const char *dev_name; - - ucs_assert_always(pack_args->field_mask & - UCT_CM_EP_PRIV_DATA_PACK_ARGS_FIELD_DEVICE_NAME); - - dev_name = pack_args->dev_name; - - status = ucp_address_pack(worker, NULL, UINT64_MAX, - UCP_ADDRESS_PACK_FLAGS_ALL, NULL, - &address_length, (void**)&worker_address); - if (status != UCS_OK) { - goto err; - } - - conn_priv_len = sizeof(*sa_data) + address_length; - - /* pack client data */ - ucs_assert((int)ucp_ep_config(ucp_ep)->key.err_mode <= UINT8_MAX); - sa_data->err_mode = ucp_ep_config(ucp_ep)->key.err_mode; - sa_data->ep_id = ucp_ep_local_id(ucp_ep); - sa_data->dev_index = UCP_NULL_RESOURCE; /* Not used */ - - attrs = ucp_worker_iface_get_attr(worker, sockaddr_rsc); - - /* check private data length limitation */ - if (conn_priv_len > attrs->max_conn_priv) { - - /* since the full worker address is too large to fit into the trasnport's - * private data, try to pack sockaddr aux tls to pass in the address */ - status = ucp_wireup_ep_pack_sockaddr_aux_tls(worker, dev_name, - &tl_bitmap, &rsc_address, - &address_length); - if (status != UCS_OK) { - goto err_free_address; - } - - conn_priv_len = sizeof(*sa_data) + address_length; - - /* check the private data length limitation again, now with partial - * resources packed (and not the entire worker address) */ - if (conn_priv_len > attrs->max_conn_priv) { - ucs_error("sockaddr aux resources addresses (%s transports)" - " information (%zu) exceeds max_priv on " - UCT_TL_RESOURCE_DESC_FMT" (%zu)", - ucp_tl_bitmap_str(context, tl_bitmap, aux_tls_str, - sizeof(aux_tls_str)), - conn_priv_len, - UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[sockaddr_rsc].tl_rsc), - attrs->max_conn_priv); - status = UCS_ERR_UNREACHABLE; - ucs_free(rsc_address); - goto err_free_address; - } - - sa_data->addr_mode = UCP_WIREUP_SA_DATA_PARTIAL_ADDR; - memcpy(sa_data + 1, rsc_address, address_length); - ucp_ep->flags |= UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR; - - ucs_free(rsc_address); - - ucs_trace("sockaddr tl ("UCT_TL_RESOURCE_DESC_FMT") sending partial address: " - "(%s transports) (len=%zu) to server. " - "total client priv data len: %zu", - UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[sockaddr_rsc].tl_rsc), - ucp_tl_bitmap_str(context, tl_bitmap, aux_tls_str, - sizeof(aux_tls_str)), - address_length, conn_priv_len); - } else { - sa_data->addr_mode = UCP_WIREUP_SA_DATA_FULL_ADDR; - memcpy(sa_data + 1, worker_address, address_length); - } - - ucp_worker_release_address(worker, worker_address); - return conn_priv_len; - -err_free_address: - ucp_worker_release_address(worker, worker_address); -err: - return status; -} - -ucs_status_t ucp_wireup_ep_connect_to_sockaddr(uct_ep_h uct_ep, - const ucp_ep_params_t *params) -{ - ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); - ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; - ucp_worker_h worker = ucp_ep->worker; - char saddr_str[UCS_SOCKADDR_STRING_LEN]; - uct_ep_params_t uct_ep_params; - ucp_rsc_index_t sockaddr_rsc; - ucp_worker_iface_t *wiface; - ucs_status_t status; - - ucs_assert(ucp_wireup_ep_test(uct_ep)); - - status = ucp_wireup_select_sockaddr_transport(worker->context, - ¶ms->sockaddr, - &sockaddr_rsc); - if (status != UCS_OK) { - goto out; - } - - wiface = ucp_worker_iface(worker, sockaddr_rsc); - - wireup_ep->sockaddr_rsc_index = sockaddr_rsc; - - /* Fill parameters and send connection request using the transport */ - uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE | - UCT_EP_PARAM_FIELD_USER_DATA | - UCT_EP_PARAM_FIELD_SOCKADDR | - UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS | - UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB; - uct_ep_params.iface = wiface->iface; - uct_ep_params.sockaddr = ¶ms->sockaddr; - uct_ep_params.user_data = wireup_ep; - uct_ep_params.sockaddr_cb_flags = UCT_CB_FLAG_ASYNC; - uct_ep_params.sockaddr_pack_cb = ucp_wireup_ep_sockaddr_fill_private_data; - status = uct_ep_create(&uct_ep_params, &wireup_ep->sockaddr_ep); - if (status != UCS_OK) { - goto out; - } - - ucs_debug("ep %p connecting to %s", ucp_ep, - ucs_sockaddr_str(params->sockaddr.addr, saddr_str, sizeof(saddr_str))); - status = UCS_OK; - -out: - return status; -} - void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); diff --git a/src/ucp/wireup/wireup_ep.h b/src/ucp/wireup/wireup_ep.h index 0e0e2339919..2c611b14e06 100644 --- a/src/ucp/wireup/wireup_ep.h +++ b/src/ucp/wireup/wireup_ep.h @@ -81,9 +81,6 @@ ucs_status_t ucp_wireup_ep_connect(uct_ep_h uct_ep, unsigned ucp_ep_init_flags, unsigned path_index, int connect_aux, const ucp_unpacked_address_t *remote_address); -ucs_status_t ucp_wireup_ep_connect_to_sockaddr(uct_ep_h uct_ep, - const ucp_ep_params_t *params); - void ucp_wireup_ep_pending_queue_purge(uct_ep_h uct_ep, uct_pending_purge_callback_t cb, void *arg); diff --git a/test/gtest/ucp/test_ucp_sockaddr.cc b/test/gtest/ucp/test_ucp_sockaddr.cc index 17fae8257e4..4d32f0ae688 100644 --- a/test/gtest/ucp/test_ucp_sockaddr.cc +++ b/test/gtest/ucp/test_ucp_sockaddr.cc @@ -43,8 +43,7 @@ class test_ucp_sockaddr : public ucp_test { enum { TEST_MODIFIER_MASK = UCS_MASK(16), - TEST_MODIFIER_MT = UCS_BIT(16), - TEST_MODIFIER_CM = UCS_BIT(17) + TEST_MODIFIER_MT = UCS_BIT(16) }; enum { @@ -62,10 +61,6 @@ class test_ucp_sockaddr : public ucp_test { void init() { m_err_count = 0; - - if (get_variant_value() & TEST_MODIFIER_CM) { - modify_config("SOCKADDR_CM_ENABLE", "yes"); - } get_sockaddr(); ucp_test::init(); skip_loopback(); @@ -84,10 +79,6 @@ class test_ucp_sockaddr : public ucp_test { uint64_t features = UCP_FEATURE_TAG | UCP_FEATURE_STREAM) { get_test_variants_mt(variants, features, CONN_REQ_TAG, "tag"); get_test_variants_mt(variants, features, CONN_REQ_STREAM, "stream"); - get_test_variants_mt(variants, features, CONN_REQ_TAG | TEST_MODIFIER_CM, - "tag,cm"); - get_test_variants_mt(variants, features, CONN_REQ_STREAM | TEST_MODIFIER_CM, - "stream,cm"); } static ucs_log_func_rc_t @@ -131,11 +122,6 @@ class test_ucp_sockaddr : public ucp_test { * only IPoIB IP addresses. therefore, if the interface * isn't as such, we continue to the next one. */ skip = 1; - } else if (!ucs::is_rdmacm_netdev(ifa->ifa_name) && - !(get_variant_value() & TEST_MODIFIER_CM)) { - /* old client-server API (without CM) ran only with - * IPoIB/RoCE interface */ - skip = 1; } else if ((has_transport("tcp") || has_transport("all")) && (ifa->ifa_addr->sa_family == AF_INET6)) { /* the tcp transport (and 'all' which may fallback to tcp_sockcmm) @@ -565,11 +551,7 @@ class test_ucp_sockaddr : public ucp_test { bool nonparameterized_test() const { return (get_variant_value() != DEFAULT_PARAM_VARIANT) && - (get_variant_value() != (CONN_REQ_TAG | TEST_MODIFIER_CM)); - } - - bool no_close_protocol() const { - return !(get_variant_value() & TEST_MODIFIER_CM); + (get_variant_value() != CONN_REQ_TAG); } static void cmp_cfg_lanes(ucp_ep_config_key_t *key1, ucp_lane_index_t lane1, @@ -585,8 +567,7 @@ class test_ucp_sockaddr : public ucp_test { unsigned test_ucp_sockaddr::m_err_count = 0; - -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listen, no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, listen) { listen_and_communicate(false, 0); } @@ -594,7 +575,7 @@ UCS_TEST_P(test_ucp_sockaddr, listen_c2s) { listen_and_communicate(false, SEND_DIRECTION_C2S); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listen_s2c, no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, listen_s2c) { listen_and_communicate(false, SEND_DIRECTION_S2C); } @@ -602,8 +583,7 @@ UCS_TEST_P(test_ucp_sockaddr, listen_bidi) { listen_and_communicate(false, SEND_DIRECTION_BIDI); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect) { listen_and_communicate(false, 0); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); } @@ -613,8 +593,7 @@ UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_c2s) { one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect_s2c, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_s2c) { listen_and_communicate(false, SEND_DIRECTION_S2C); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); } @@ -650,9 +629,7 @@ UCS_TEST_P(test_ucp_sockaddr, close_callback) { } } - -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect_bidi_wait_err_cb, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_bidi_wait_err_cb) { listen_and_communicate(false, SEND_DIRECTION_BIDI); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); @@ -660,20 +637,17 @@ UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect_bidi_wait_err_cb, EXPECT_EQ(1u, m_err_count); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect) { listen_and_communicate(false, 0); concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_c2s, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_c2s) { listen_and_communicate(false, SEND_DIRECTION_C2S); concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_s2c, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_s2c) { listen_and_communicate(false, SEND_DIRECTION_S2C); concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH); } @@ -683,20 +657,17 @@ UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_bidi) { concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_force) { listen_and_communicate(false, 0); concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force_c2s, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_force_c2s) { listen_and_communicate(false, SEND_DIRECTION_C2S); concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force_s2c, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_force_s2c) { listen_and_communicate(false, SEND_DIRECTION_S2C); concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE); } @@ -764,8 +735,7 @@ UCS_TEST_P(test_ucp_sockaddr, err_handle) { EXPECT_EQ(1u, sender().get_err_num()); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, compare_cm_and_wireup_configs, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr, compare_cm_and_wireup_configs) { ucp_worker_cfg_index_t cm_ep_cfg_index, wireup_ep_cfg_index; ucp_ep_config_key_t *cm_ep_cfg_key, *wireup_ep_cfg_key; @@ -872,84 +842,72 @@ class test_ucp_sockaddr_destroy_ep_on_err : public test_ucp_sockaddr { ucs::ptr_vector m_env; }; -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, empty, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, empty) { listen_and_communicate(false, 0); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, s2c, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, s2c) { listen_and_communicate(false, SEND_DIRECTION_S2C); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, c2s, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, c2s) { listen_and_communicate(false, SEND_DIRECTION_C2S); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, bidi, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, bidi) { listen_and_communicate(false, SEND_DIRECTION_BIDI); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_cforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_cforce) { listen_and_communicate(false, 0); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_cforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_cforce) { listen_and_communicate(false, SEND_DIRECTION_C2S); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_cforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_cforce) { listen_and_communicate(false, SEND_DIRECTION_S2C); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_cforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_cforce) { listen_and_communicate(false, SEND_DIRECTION_BIDI); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_sforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_sforce) { listen_and_communicate(false, 0); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_sforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_sforce) { listen_and_communicate(false, SEND_DIRECTION_C2S); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_sforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_sforce) { listen_and_communicate(false, SEND_DIRECTION_S2C); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE); one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_sforce, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_sforce) { listen_and_communicate(false, SEND_DIRECTION_BIDI); scoped_log_handler slh(wrap_errors_logger); one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE); @@ -967,8 +925,7 @@ class test_ucp_sockaddr_with_wakeup : public test_ucp_sockaddr { } }; -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, wakeup, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup) { listen_and_communicate(true, 0); } @@ -976,8 +933,7 @@ UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup_c2s) { listen_and_communicate(true, SEND_DIRECTION_C2S); } -UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, wakeup_s2c, - no_close_protocol()) { +UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup_s2c) { listen_and_communicate(true, SEND_DIRECTION_S2C); } @@ -1040,8 +996,9 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr { * worker for atomic operations */ uint64_t features = UCP_FEATURE_TAG | UCP_FEATURE_STREAM | UCP_FEATURE_RMA | UCP_FEATURE_AM; - test_ucp_sockaddr::get_test_variants_mt(variants, features, - TEST_MODIFIER_CM, ""); + + add_variant_with_value(variants, features, TEST_MODIFIER_MT, + "mt", MULTI_THREAD_WORKER); } virtual void init() {