From 4fd65d41ccde6db6351ff65d4add6055975855a7 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Mon, 12 Sep 2016 20:00:56 +0300 Subject: [PATCH 1/7] UCT/API/TOOLS: Expose atomic domain - "host" or "device". --- src/tools/info/tl_info.c | 12 +++++++++--- src/uct/api/uct.h | 7 +++++++ src/uct/ib/dc/accel/dc_mlx5.c | 3 ++- src/uct/ib/dc/verbs/dc_verbs.c | 1 + src/uct/ib/rc/base/rc_iface.c | 9 ++++++--- src/uct/sm/mm/mm_iface.c | 1 + src/uct/sm/self/self_iface.c | 1 + src/uct/ugni/rdma/ugni_rdma_iface.c | 1 + 8 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/tools/info/tl_info.c b/src/tools/info/tl_info.c index 32a597beda7..8fbdcfa44f5 100644 --- a/src/tools/info/tl_info.c +++ b/src/tools/info/tl_info.c @@ -21,13 +21,19 @@ #define PRINT_ATOMIC_CAP(_name, _cap_flags) \ if ((_cap_flags) & (UCT_IFACE_FLAG_##_name##32 | UCT_IFACE_FLAG_##_name##64)) { \ char *s = strduplower(#_name); \ + char *domain = ""; \ + if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_HOST) { \ + domain = ", host"; \ + } else if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_DEVICE) { \ + domain = ", device"; \ + } \ if (ucs_test_all_flags(_cap_flags, \ UCT_IFACE_FLAG_##_name##32 | UCT_IFACE_FLAG_##_name##64)) \ { \ - printf("# %12s: 32, 64 bit\n", s); \ + printf("# %12s: 32, 64 bit%s\n", s, domain); \ } else { \ - printf("# %12s: %d bit\n", s, \ - ((_cap_flags) & UCT_IFACE_FLAG_##_name##32) ? 32 : 64); \ + printf("# %12s: %d bit%s\n", s, \ + ((_cap_flags) & UCT_IFACE_FLAG_##_name##32) ? 32 : 64, domain); \ } \ free(s); \ } diff --git a/src/uct/api/uct.h b/src/uct/api/uct.h index d8ae38f1923..739c81dcc5c 100644 --- a/src/uct/api/uct.h +++ b/src/uct/api/uct.h @@ -172,6 +172,13 @@ enum { UCT_IFACE_FLAG_ATOMIC_CSWAP32 = UCS_BIT(22), /**< 32bit atomic compare-and-swap */ UCT_IFACE_FLAG_ATOMIC_CSWAP64 = UCS_BIT(23), /**< 64bit atomic compare-and-swap */ + /* Atomic operations domain */ + UCT_IFACE_FLAG_ATOMIC_HOST = UCS_BIT(30), /**< Atomic communications are atomic + with respect to host operations. */ + UCT_IFACE_FLAG_ATOMIC_DEVICE = UCS_BIT(31), /**< Atomic communications are atomic + only with respect to other atomics + on the same device. */ + /* Error handling capabilities */ UCT_IFACE_FLAG_ERRHANDLE_SHORT_BUF = UCS_BIT(32), /**< Invalid buffer for short operation */ UCT_IFACE_FLAG_ERRHANDLE_BCOPY_BUF = UCS_BIT(33), /**< Invalid buffer for buffered operation */ diff --git a/src/uct/ib/dc/accel/dc_mlx5.c b/src/uct/ib/dc/accel/dc_mlx5.c index 5a5cb3b06e9..1ec2b3c59db 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.c +++ b/src/uct/ib/dc/accel/dc_mlx5.c @@ -79,7 +79,8 @@ static ucs_status_t uct_dc_mlx5_iface_query(uct_iface_h tl_iface, uct_iface_attr UCT_IFACE_FLAG_ATOMIC_ADD32| UCT_IFACE_FLAG_ATOMIC_FADD32| UCT_IFACE_FLAG_ATOMIC_SWAP32| - UCT_IFACE_FLAG_ATOMIC_CSWAP32| + UCT_IFACE_FLAG_ATOMIC_CSWAP32| + UCT_IFACE_FLAG_ATOMIC_DEVICE | UCT_IFACE_FLAG_PENDING| UCT_IFACE_FLAG_AM_CB_SYNC|UCT_IFACE_FLAG_CONNECT_TO_IFACE; diff --git a/src/uct/ib/dc/verbs/dc_verbs.c b/src/uct/ib/dc/verbs/dc_verbs.c index a0852621f8f..04aa4811d96 100644 --- a/src/uct/ib/dc/verbs/dc_verbs.c +++ b/src/uct/ib/dc/verbs/dc_verbs.c @@ -76,6 +76,7 @@ static ucs_status_t uct_dc_verbs_iface_query(uct_iface_h tl_iface, uct_iface_att UCT_IFACE_FLAG_ATOMIC_FADD32| UCT_IFACE_FLAG_ATOMIC_SWAP32| UCT_IFACE_FLAG_ATOMIC_CSWAP32| + UCT_IFACE_FLAG_ATOMIC_DEVICE | UCT_IFACE_FLAG_PENDING| UCT_IFACE_FLAG_AM_CB_SYNC|UCT_IFACE_FLAG_CONNECT_TO_IFACE; diff --git a/src/uct/ib/rc/base/rc_iface.c b/src/uct/ib/rc/base/rc_iface.c index b7dab1ab11b..1cfaf5a9804 100644 --- a/src/uct/ib/rc/base/rc_iface.c +++ b/src/uct/ib/rc/base/rc_iface.c @@ -107,18 +107,21 @@ void uct_rc_iface_query(uct_rc_iface_t *iface, uct_iface_attr_t *iface_attr) if (uct_ib_atomic_is_supported(dev, 0, sizeof(uint64_t))) { iface_attr->cap.flags |= UCT_IFACE_FLAG_ATOMIC_ADD64 | UCT_IFACE_FLAG_ATOMIC_FADD64 | - UCT_IFACE_FLAG_ATOMIC_CSWAP64; + UCT_IFACE_FLAG_ATOMIC_CSWAP64 | + UCT_IFACE_FLAG_ATOMIC_DEVICE; } if (uct_ib_atomic_is_supported(dev, 1, sizeof(uint64_t))) { - iface_attr->cap.flags |= UCT_IFACE_FLAG_ATOMIC_SWAP64; + iface_attr->cap.flags |= UCT_IFACE_FLAG_ATOMIC_SWAP64 | + UCT_IFACE_FLAG_ATOMIC_DEVICE; } if (uct_ib_atomic_is_supported(dev, 1, sizeof(uint32_t))) { iface_attr->cap.flags |= UCT_IFACE_FLAG_ATOMIC_ADD32 | UCT_IFACE_FLAG_ATOMIC_FADD32 | UCT_IFACE_FLAG_ATOMIC_SWAP32 | - UCT_IFACE_FLAG_ATOMIC_CSWAP32; + UCT_IFACE_FLAG_ATOMIC_CSWAP32 | + UCT_IFACE_FLAG_ATOMIC_DEVICE; } } diff --git a/src/uct/sm/mm/mm_iface.c b/src/uct/sm/mm/mm_iface.c index 6ef3ce2a901..4af4e743de9 100644 --- a/src/uct/sm/mm/mm_iface.c +++ b/src/uct/sm/mm/mm_iface.c @@ -103,6 +103,7 @@ static ucs_status_t uct_mm_iface_query(uct_iface_h tl_iface, UCT_IFACE_FLAG_ATOMIC_SWAP32 | UCT_IFACE_FLAG_ATOMIC_CSWAP64 | UCT_IFACE_FLAG_ATOMIC_CSWAP32 | + UCT_IFACE_FLAG_ATOMIC_HOST | UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | diff --git a/src/uct/sm/self/self_iface.c b/src/uct/sm/self/self_iface.c index f5a83c837ec..4f0997058a1 100644 --- a/src/uct/sm/self/self_iface.c +++ b/src/uct/sm/self/self_iface.c @@ -45,6 +45,7 @@ static ucs_status_t uct_self_iface_query(uct_iface_h iface, uct_iface_attr_t *at UCT_IFACE_FLAG_ATOMIC_SWAP32 | UCT_IFACE_FLAG_ATOMIC_CSWAP64 | UCT_IFACE_FLAG_ATOMIC_CSWAP32 | + UCT_IFACE_FLAG_ATOMIC_HOST | UCT_IFACE_FLAG_PENDING | UCT_IFACE_FLAG_AM_CB_SYNC; diff --git a/src/uct/ugni/rdma/ugni_rdma_iface.c b/src/uct/ugni/rdma/ugni_rdma_iface.c index 236dcf082dc..03008ad73c5 100644 --- a/src/uct/ugni/rdma/ugni_rdma_iface.c +++ b/src/uct/ugni/rdma/ugni_rdma_iface.c @@ -55,6 +55,7 @@ static ucs_status_t uct_ugni_rdma_iface_query(uct_iface_h tl_iface, uct_iface_at UCT_IFACE_FLAG_ATOMIC_CSWAP64 | UCT_IFACE_FLAG_ATOMIC_FADD64 | UCT_IFACE_FLAG_ATOMIC_ADD64 | + UCT_IFACE_FLAG_ATOMIC_DEVICE | UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_GET_ZCOPY | UCT_IFACE_FLAG_CONNECT_TO_IFACE | From 0deca3129e0f84913a317c25ffe2112bea8f991d Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Mon, 12 Sep 2016 20:01:52 +0300 Subject: [PATCH 2/7] UCP/CONFIG: Add option to select atomic mode: "host"/"device". Selecting host-based atomics enables only transports with host-based atomics. Selecting device-based atomics picks the best local device which supports atomics to registered memory, and enables all transports on it. --- src/ucp/core/ucp_context.c | 22 +++++++- src/ucp/core/ucp_context.h | 14 +++++ src/ucp/core/ucp_worker.c | 106 +++++++++++++++++++++++++++++++++++++ src/ucp/core/ucp_worker.h | 16 ++++++ src/ucp/wireup/address.c | 16 ++++-- src/ucp/wireup/address.h | 7 +-- src/ucp/wireup/select.c | 31 +++++------ src/ucp/wireup/wireup.h | 4 ++ 8 files changed, 187 insertions(+), 29 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index f10fcdb239e..f3016c39830 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -19,6 +19,11 @@ ucp_am_handler_t ucp_am_handlers[UCP_AM_ID_LAST] = {{0, NULL, NULL}}; +static const char *ucp_atomic_modes[] = { + [UCP_ATOMIC_MODE_HOST] = "host", + [UCP_ATOMIC_MODE_DEVICE] = "device", + [UCP_ATOMIC_MODE_LAST] = NULL, +}; static ucs_config_field_t ucp_config_table[] = { {"NET_DEVICES", "all", @@ -84,12 +89,19 @@ static ucs_config_field_t ucp_config_table[] = { "Estimation of buffer copy bandwidth", ucs_offsetof(ucp_config_t, ctx.bcopy_bw), UCS_CONFIG_TYPE_MEMUNITS}, + {"ATOMIC_MODE", "device", + "Atomic operations synchronization mode.\n" + " host - operations are atomic with respect to the host processor.\n" + " device - atomic operations are performed on one of the transport devices,\n" + " and there is no atomicity guarantee with respect to the host processor.", + ucs_offsetof(ucp_config_t, ctx.atomic_mode), UCS_CONFIG_TYPE_ENUM(ucp_atomic_modes)}, + {"LOG_DATA", "0", "Size of packet data that is dumped to the log system in debug mode (0 - nothing).", ucs_offsetof(ucp_config_t, ctx.log_data_size), UCS_CONFIG_TYPE_MEMUNITS}, {"MAX_WORKER_NAME", UCS_PP_MAKE_STRING(UCP_WORKER_NAME_MAX), - "Maximal length of worker name. Affects the size of worker address.", + "Maximal length of worker name. Affects the size of worker address in debug builds.", ucs_offsetof(ucp_config_t, ctx.max_worker_name), UCS_CONFIG_TYPE_UINT}, {NULL} @@ -711,6 +723,14 @@ void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max, } } +uint64_t ucp_context_uct_atomic_iface_flags(ucp_context_h context) +{ + return ((context->config.features & UCP_FEATURE_AMO32) ? + UCP_UCT_IFACE_ATOMIC32_FLAGS : 0) | + ((context->config.features & UCP_FEATURE_AMO64) ? + UCP_UCT_IFACE_ATOMIC64_FLAGS : 0); +} + ucs_status_t ucp_context_query(ucp_context_h context, ucp_context_attr_t *attr) { attr->request_size = sizeof(ucp_request_t); diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 21fb1581ffa..6dfb220cf7d 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -67,6 +67,16 @@ enum { }; +/** + * Atomic operations mode. + */ +typedef enum { + UCP_ATOMIC_MODE_HOST, /* Use host-based atomics */ + UCP_ATOMIC_MODE_DEVICE, /* Use device-based atomics */ + UCP_ATOMIC_MODE_LAST +} ucp_atomic_mode_t; + + typedef struct ucp_context_config { /** Threshold for switching UCP to buffered copy(bcopy) protocol */ size_t bcopy_thresh; @@ -83,6 +93,8 @@ typedef struct ucp_context_config { size_t log_data_size; /** Maximal size of worker name for debugging */ unsigned max_worker_name; + /** Atomic mode */ + ucp_atomic_mode_t atomic_mode; } ucp_context_config_t; @@ -200,4 +212,6 @@ extern ucp_am_handler_t ucp_am_handlers[]; void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max, const void *data, size_t length); +uint64_t ucp_context_uct_atomic_iface_flags(ucp_context_h context); + #endif diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 138e314cdec..41ea5ad24d8 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -259,6 +259,109 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker, return status; } +static void ucp_worker_enable_atomic_tl(ucp_worker_h worker, const char *mode, + ucp_rsc_index_t rsc_index) +{ + ucs_assert(rsc_index != UCP_NULL_RESOURCE); + ucs_trace("worker %p: using %s atomics on iface[%d]=" UCT_TL_RESOURCE_DESC_FMT, + worker, mode, rsc_index, + UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[rsc_index].tl_rsc)); + worker->atomic_tls |= UCS_BIT(rsc_index); +} + +static void ucp_worker_init_host_atomics(ucp_worker_h worker) +{ + ucp_context_h context = worker->context; + ucp_rsc_index_t rsc_index; + + /* Enable all interfaces which have host-based atomics */ + for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { + if (worker->iface_attrs[rsc_index].cap.flags & UCT_IFACE_FLAG_ATOMIC_HOST) { + ucp_worker_enable_atomic_tl(worker, "host", rsc_index); + } + } +} + +static void ucp_worker_init_device_atomics(ucp_worker_h worker) +{ + ucp_context_h context = worker->context; + ucp_address_iface_attr_t dummy_iface_attr; + ucp_tl_resource_desc_t *rsc, *best_rsc; + uct_iface_attr_t *iface_attr; + ucp_rsc_index_t rsc_index; + uint64_t iface_cap_flags; + double score, best_score; + ucp_rsc_index_t md_index; + uct_md_attr_t *md_attr; + uint64_t supp_tls; + + iface_cap_flags = ucp_context_uct_atomic_iface_flags(context) | + UCT_IFACE_FLAG_ATOMIC_DEVICE; + + dummy_iface_attr.bandwidth = 1e12; + dummy_iface_attr.cap_flags = -1; + dummy_iface_attr.overhead = 0; + dummy_iface_attr.priority = 0; + + supp_tls = 0; + best_score = -1; + best_rsc = NULL; + + /* Select best interface for atomics device */ + for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { + rsc = &context->tl_rscs[rsc_index]; + md_index = rsc->md_index; + md_attr = &context->md_attrs[md_index]; + iface_attr = &worker->iface_attrs[rsc_index]; + + if (!(md_attr->cap.flags & UCT_MD_FLAG_REG) || + !ucs_test_all_flags(iface_attr->cap.flags, iface_cap_flags)) + { + continue; + } + + supp_tls |= UCS_BIT(rsc_index); + + score = ucp_wireup_amo_score_func(md_attr, iface_attr, &dummy_iface_attr); + if (score > best_score) { + best_rsc = rsc; + best_score = score; + } + } + + if (best_rsc == NULL) { + ucs_debug("worker %p: no support for atomics", worker); + return; + } + + /* Enable atomics on all resources using same device as the "best" resource */ + for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { + rsc = &context->tl_rscs[rsc_index]; + if ((supp_tls & UCS_BIT(rsc_index)) && + (rsc->md_index == best_rsc->md_index) && + !strncmp(rsc->tl_rsc.dev_name, best_rsc->tl_rsc.dev_name, + UCT_DEVICE_NAME_MAX)) + { + ucp_worker_enable_atomic_tl(worker, "device", rsc_index); + } + } +} + +static void ucp_worker_init_atomic_tls(ucp_worker_h worker) +{ + ucp_context_h context = worker->context; + + worker->atomic_tls = 0; + + if (context->config.features & (UCP_FEATURE_AMO32|UCP_FEATURE_AMO64)) { + if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_HOST) { + ucp_worker_init_host_atomics(worker); + } else if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_DEVICE) { + ucp_worker_init_device_atomics(worker); + } + } +} + /* All the ucp endpoints will share the configurations. No need for every ep to * have it's own configuration (to save memory footprint). Same config can be used * by different eps. @@ -378,6 +481,9 @@ ucs_status_t ucp_worker_create(ucp_context_h context, ucs_thread_mode_t thread_m } } + /* Select atomic resources */ + ucp_worker_init_atomic_tls(worker); + *worker_p = worker; return UCS_OK; diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index d35bd9a0439..b8cfa163cec 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -16,6 +16,21 @@ KHASH_MAP_INIT_INT64(ucp_worker_ep_hash, ucp_ep_t *); + +enum { + UCP_UCT_IFACE_ATOMIC32_FLAGS = + UCT_IFACE_FLAG_ATOMIC_ADD32 | + UCT_IFACE_FLAG_ATOMIC_FADD32 | + UCT_IFACE_FLAG_ATOMIC_SWAP32 | + UCT_IFACE_FLAG_ATOMIC_CSWAP32, + UCP_UCT_IFACE_ATOMIC64_FLAGS = + UCT_IFACE_FLAG_ATOMIC_ADD64 | + UCT_IFACE_FLAG_ATOMIC_FADD64 | + UCT_IFACE_FLAG_ATOMIC_SWAP64 | + UCT_IFACE_FLAG_ATOMIC_CSWAP64 +}; + + /** * UCP worker wake-up context. */ @@ -36,6 +51,7 @@ typedef struct ucp_worker { uct_worker_h uct; /* UCT worker handle */ ucs_mpool_t req_mp; /* Memory pool for requests */ ucp_worker_wakeup_t wakeup; /* Wakeup-related context */ + uint64_t atomic_tls; /* Which resources can be used for atomics */ int inprogress; char name[UCP_WORKER_NAME_MAX]; /* Worker name */ diff --git a/src/ucp/wireup/address.c b/src/ucp/wireup/address.c index d6f887b8c9f..ca27c62b37e 100644 --- a/src/ucp/wireup/address.c +++ b/src/ucp/wireup/address.c @@ -137,7 +137,7 @@ ucp_address_gather_devices(ucp_worker_h worker, uint64_t tl_bitmap, int has_ep, iface_attr = &worker->iface_attrs[i]; if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { - dev->tl_addrs_size += iface_attr->iface_addr_len; + dev->tl_addrs_size += iface_attr->iface_addr_len; } else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { if (has_ep) { dev->tl_addrs_size += iface_attr->ep_addr_len; @@ -219,11 +219,18 @@ ucp_address_pack_ep_address(ucp_ep_h ep, ucp_rsc_index_t tl_index, } static void ucp_address_pack_iface_attr(ucp_address_packed_iface_attr_t *packed, - const uct_iface_attr_t *iface_attr) + const uct_iface_attr_t *iface_attr, + int enable_atomics) { uint32_t packed_flag; + uint64_t cap_flags; uint64_t bit; + cap_flags = iface_attr->cap.flags; + if (!enable_atomics) { + cap_flags &= ~(UCP_UCT_IFACE_ATOMIC32_FLAGS | UCP_UCT_IFACE_ATOMIC64_FLAGS); + } + packed->prio_cap_flags = ((uint8_t)iface_attr->priority); packed->overhead = iface_attr->overhead; packed->bandwidth = iface_attr->bandwidth; @@ -233,7 +240,7 @@ static void ucp_address_pack_iface_attr(ucp_address_packed_iface_attr_t *packed, bit = 1; while (UCP_ADDRESS_IFACE_FLAGS & ~(bit - 1)) { if (UCP_ADDRESS_IFACE_FLAGS & bit) { - if (iface_attr->cap.flags & bit) { + if (cap_flags & bit) { packed->prio_cap_flags |= packed_flag; } packed_flag <<= 1; @@ -336,7 +343,8 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, ptr += sizeof(uint16_t); /* Transport information */ - ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i]); + ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i], + worker->atomic_tls & UCS_BIT(i)); ptr += sizeof(ucp_address_packed_iface_attr_t); /* Transport address length */ diff --git a/src/ucp/wireup/address.h b/src/ucp/wireup/address.h index 432963da3d1..c856232b408 100644 --- a/src/ucp/wireup/address.h +++ b/src/ucp/wireup/address.h @@ -24,11 +24,8 @@ enum { UCT_IFACE_FLAG_PUT_BCOPY | UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_GET_ZCOPY | - UCT_IFACE_FLAG_ATOMIC_ADD32 | - UCT_IFACE_FLAG_ATOMIC_FADD32 | - UCT_IFACE_FLAG_ATOMIC_SWAP32 | - UCT_IFACE_FLAG_ATOMIC_CSWAP32 | - UCT_IFACE_FLAG_ATOMIC_ADD64 | + UCP_UCT_IFACE_ATOMIC32_FLAGS | + UCP_UCT_IFACE_ATOMIC64_FLAGS | UCT_IFACE_FLAG_ATOMIC_FADD64 | UCT_IFACE_FLAG_ATOMIC_SWAP64 | UCT_IFACE_FLAG_ATOMIC_CSWAP64 | diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index e928984b3d4..54f8c05d67d 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -184,11 +184,16 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list addr_index_map |= UCS_BIT(addr_index); } + if (!addr_index_map) { + snprintf(p, endp - p, "not supported by peer "); + p += strlen(p); + } + /* For each local resource try to find the best remote address to connect to. * Pick the best local resource to satisfy the criteria. * best one has the highest score (from the dedicated score_func) and * has a reachable tl on the remote peer */ - for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { + for (rsc_index = 0; addr_index_map && (rsc_index < context->num_tls); ++rsc_index) { resource = &context->tl_rscs[rsc_index].tl_rsc; iface_attr = &worker->iface_attrs[rsc_index]; md_attr = &context->md_attrs[context->tl_rscs[rsc_index].md_index]; @@ -245,7 +250,7 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list * debug message. */ if (!reachable) { - snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - cannot reach remote worker, ", + snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - cannot reach peer, ", UCT_TL_RESOURCE_DESC_ARG(resource)); p += strlen(p); } @@ -460,9 +465,9 @@ static ucs_status_t ucp_wireup_add_rma_lanes(ucp_ep_h ep, unsigned address_count UCP_WIREUP_LANE_USAGE_RMA); } -static double ucp_wireup_amo_score_func(const uct_md_attr_t *md_attr, - const uct_iface_attr_t *iface_attr, - const ucp_address_iface_attr_t *remote_iface_attr) +double ucp_wireup_amo_score_func(const uct_md_attr_t *md_attr, + const uct_iface_attr_t *iface_attr, + const ucp_address_iface_attr_t *remote_iface_attr) { /* best one-sided latency */ return 1e-3 / (iface_attr->latency + iface_attr->overhead); @@ -475,20 +480,8 @@ static ucs_status_t ucp_wireup_add_amo_lanes(ucp_ep_h ep, unsigned address_count { ucp_wireup_criteria_t criteria; - criteria.remote_iface_flags = 0; - - if (ucp_ep_get_context_features(ep) & UCP_FEATURE_AMO32) { - criteria.remote_iface_flags |= UCT_IFACE_FLAG_ATOMIC_ADD32 | - UCT_IFACE_FLAG_ATOMIC_FADD32 | - UCT_IFACE_FLAG_ATOMIC_SWAP32 | - UCT_IFACE_FLAG_ATOMIC_CSWAP32; - } - if (ucp_ep_get_context_features(ep) & UCP_FEATURE_AMO64) { - criteria.remote_iface_flags |= UCT_IFACE_FLAG_ATOMIC_ADD64 | - UCT_IFACE_FLAG_ATOMIC_FADD64 | - UCT_IFACE_FLAG_ATOMIC_SWAP64 | - UCT_IFACE_FLAG_ATOMIC_CSWAP64; - } + criteria.remote_iface_flags = + ucp_context_uct_atomic_iface_flags(ep->worker->context); if (criteria.remote_iface_flags == 0) { return UCS_OK; } diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index b3c5062fb24..1e6b12f197c 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -71,6 +71,10 @@ ucs_status_t ucp_wireup_select_aux_transport(ucp_ep_h ep, ucp_rsc_index_t *rsc_index_p, unsigned *addr_index_p); +double ucp_wireup_amo_score_func(const uct_md_attr_t *md_attr, + const uct_iface_attr_t *iface_attr, + const ucp_address_iface_attr_t *remote_iface_attr); + ucs_status_t ucp_wireup_msg_progress(uct_pending_req_t *self); ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned address_count, From 37386019b3cf234d2cbd2ba764cb9a033c8d825a Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Mon, 12 Sep 2016 20:04:14 +0300 Subject: [PATCH 3/7] TEST: Fix UCP context features for RMA/AMO tests. --- test/gtest/ucp/test_ucp_atomic.cc | 4 ++-- test/gtest/ucp/test_ucp_memheap.cc | 5 ----- test/gtest/ucp/test_ucp_memheap.h | 2 -- test/gtest/ucp/test_ucp_mmap.cc | 7 ++++++- test/gtest/ucp/test_ucp_rma.cc | 6 +++++- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/test/gtest/ucp/test_ucp_atomic.cc b/test/gtest/ucp/test_ucp_atomic.cc index 351a903b9e8..724f547336c 100644 --- a/test/gtest/ucp/test_ucp_atomic.cc +++ b/test/gtest/ucp/test_ucp_atomic.cc @@ -147,7 +147,7 @@ class test_ucp_atomic : public test_ucp_memheap { class test_ucp_atomic32 : public test_ucp_atomic { public: static ucp_params_t get_ctx_params() { - ucp_params_t params = test_ucp_memheap::get_ctx_params(); + ucp_params_t params = ucp_test::get_ctx_params(); params.features |= UCP_FEATURE_AMO32; return params; } @@ -178,7 +178,7 @@ UCP_INSTANTIATE_TEST_CASE(test_ucp_atomic32) class test_ucp_atomic64 : public test_ucp_atomic { public: static ucp_params_t get_ctx_params() { - ucp_params_t params = test_ucp_memheap::get_ctx_params(); + ucp_params_t params = ucp_test::get_ctx_params(); params.features |= UCP_FEATURE_AMO64; return params; } diff --git a/test/gtest/ucp/test_ucp_memheap.cc b/test/gtest/ucp/test_ucp_memheap.cc index da4afc6cf7f..ca13f5b46cf 100644 --- a/test/gtest/ucp/test_ucp_memheap.cc +++ b/test/gtest/ucp/test_ucp_memheap.cc @@ -174,8 +174,3 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, size_t alig } } -ucp_params_t test_ucp_memheap::get_ctx_params() { - ucp_params_t params = ucp_test::get_ctx_params(); - params.features |= UCP_FEATURE_RMA; - return params; -} diff --git a/test/gtest/ucp/test_ucp_memheap.h b/test/gtest/ucp/test_ucp_memheap.h index 74fecc52c61..6260a882b25 100644 --- a/test/gtest/ucp/test_ucp_memheap.h +++ b/test/gtest/ucp/test_ucp_memheap.h @@ -45,8 +45,6 @@ class test_ucp_memheap : public ucp_test { bool malloc_allocate, bool is_ep_flush); void test_nonblocking_implicit_stream_xfer(nonblocking_send_func_t send, size_t alignment, bool malloc_allocate, bool is_ep_flush); - - static ucp_params_t get_ctx_params(); }; diff --git a/test/gtest/ucp/test_ucp_mmap.cc b/test/gtest/ucp/test_ucp_mmap.cc index 18b9e8a7918..7f54da55a26 100644 --- a/test/gtest/ucp/test_ucp_mmap.cc +++ b/test/gtest/ucp/test_ucp_mmap.cc @@ -9,7 +9,12 @@ class test_ucp_mmap : public test_ucp_memheap { public: - using test_ucp_memheap::get_ctx_params; + static ucp_params_t get_ctx_params() { + ucp_params_t params = ucp_test::get_ctx_params(); + params.features |= UCP_FEATURE_RMA; + return params; + } + protected: void test_rkey_management(entity *e, ucp_mem_h memh, bool is_dummy); }; diff --git a/test/gtest/ucp/test_ucp_rma.cc b/test/gtest/ucp/test_ucp_rma.cc index 0abc9323ac6..08927faa0ba 100644 --- a/test/gtest/ucp/test_ucp_rma.cc +++ b/test/gtest/ucp/test_ucp_rma.cc @@ -10,7 +10,11 @@ class test_ucp_rma : public test_ucp_memheap { public: - using test_ucp_memheap::get_ctx_params; + static ucp_params_t get_ctx_params() { + ucp_params_t params = ucp_test::get_ctx_params(); + params.features |= UCP_FEATURE_RMA; + return params; + } void nonblocking_put_nbi(entity *e, size_t max_size, void *memheap_addr, From de7f572334466079fa49d3e4a162c6367f960579 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 13 Sep 2016 21:29:29 +0300 Subject: [PATCH 4/7] TEST: Add generate_test_params_variant function. --- test/gtest/ucp/ucp_test.cc | 19 +++++++++++++++++++ test/gtest/ucp/ucp_test.h | 13 +++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/test/gtest/ucp/ucp_test.cc b/test/gtest/ucp/ucp_test.cc index 92c45beda10..82d8a3ce0a8 100644 --- a/test/gtest/ucp/ucp_test.cc +++ b/test/gtest/ucp/ucp_test.cc @@ -113,6 +113,25 @@ ucp_test::enum_test_params(const ucp_params_t& ctx_params, } } +void ucp_test::generate_test_params_variant(const ucp_params_t& ctx_params, + const std::string& name, + const std::string& test_case_name, + const std::string& tls, + int variant, + std::vector& test_params) +{ + std::vector tmp_test_params, result; + + tmp_test_params = ucp_test::enum_test_params(ctx_params, name, + test_case_name, tls); + for (std::vector::iterator iter = tmp_test_params.begin(); + iter != tmp_test_params.end(); ++iter) + { + iter->variant = variant; + test_params.push_back(*iter); + } +} + void ucp_test::set_ucp_config(ucp_config_t *config, const ucp_test_param& test_param) { diff --git a/test/gtest/ucp/ucp_test.h b/test/gtest/ucp/ucp_test.h index e912760bea1..841a71e0739 100644 --- a/test/gtest/ucp/ucp_test.h +++ b/test/gtest/ucp/ucp_test.h @@ -15,7 +15,7 @@ extern "C" { struct ucp_test_param { ucp_params_t ctx_params; std::vector transports; - bool variant; + int variant; }; class ucp_test_base : public ucs::test_base { @@ -73,13 +73,22 @@ class ucp_test : public ucp_test_base, const std::string& test_case_name, const std::string& tls); + static ucp_params_t get_ctx_params(); + + static void + generate_test_params_variant(const ucp_params_t& ctx_params, + const std::string& name, + const std::string& test_case_name, + const std::string& tls, + int variant, + std::vector& test_params); + virtual void modify_config(const std::string& name, const std::string& value); protected: virtual void init(); virtual void cleanup(); ucp_test_base::entity* create_entity(bool add_in_front = false); - static ucp_params_t get_ctx_params(); void progress() const; void short_progress_loop() const; static void disable_errors(); From e0baf31cef5bc685b00f77a139d85df68bf6b713 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 13 Sep 2016 17:57:04 +0300 Subject: [PATCH 5/7] TEST/ATOMIC: Run atomic tests in host and device modes. --- test/gtest/Makefile.am | 1 + test/gtest/ucp/test_ucp_atomic.cc | 276 ++++++++++++++++-------------- test/gtest/ucp/test_ucp_atomic.h | 48 ++++++ test/gtest/ucp/test_ucp_fence.cc | 4 +- 4 files changed, 200 insertions(+), 129 deletions(-) create mode 100644 test/gtest/ucp/test_ucp_atomic.h diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index b40810e0e1d..b17b9fc880c 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -150,6 +150,7 @@ noinst_HEADERS = \ uct/uct_test.h \ uct/ud_base.h \ \ + ucp/test_ucp_atomic.h \ ucp/test_ucp_memheap.h \ ucp/test_ucp_tag.h \ ucp/ucp_test.h diff --git a/test/gtest/ucp/test_ucp_atomic.cc b/test/gtest/ucp/test_ucp_atomic.cc index 724f547336c..3d1ce254943 100644 --- a/test/gtest/ucp/test_ucp_atomic.cc +++ b/test/gtest/ucp/test_ucp_atomic.cc @@ -4,145 +4,167 @@ * See file LICENSE for terms. */ -#include "test_ucp_memheap.h" +#include "test_ucp_atomic.h" +extern "C" { +#include +} -class test_ucp_atomic : public test_ucp_memheap { -public: - template - void blocking_add(entity *e, size_t max_size, void *memheap_addr, - ucp_rkey_h rkey, std::string& expected_data) - { - ucs_status_t status; - T add, prev; - - prev = *(T*)memheap_addr; - add = (T)rand() * (T)rand(); - - if (sizeof(T) == sizeof(uint32_t)) { - status = ucp_atomic_add32(e->ep(), add, (uintptr_t)memheap_addr, rkey); - } else if (sizeof(T) == sizeof(uint64_t)) { - status = ucp_atomic_add64(e->ep(), add, (uintptr_t)memheap_addr, rkey); - } else { - status = UCS_ERR_UNSUPPORTED; - } - ASSERT_UCS_OK(status); - - expected_data.resize(sizeof(T)); - *(T*)&expected_data[0] = add + prev; - } +std::vector +test_ucp_atomic::enum_test_params(const ucp_params_t& ctx_params, + const std::string& name, + const std::string& test_case_name, + const std::string& tls) +{ + std::vector result; + generate_test_params_variant(ctx_params, name, test_case_name, tls, + UCP_ATOMIC_MODE_HOST, result); + generate_test_params_variant(ctx_params, name, test_case_name, tls, + UCP_ATOMIC_MODE_DEVICE, result); + return result; +} + +void test_ucp_atomic::init() { + const char *atomic_mode = + (GetParam().variant == UCP_ATOMIC_MODE_HOST) ? "host" : + (GetParam().variant == UCP_ATOMIC_MODE_DEVICE) ? "device" : + ""; + modify_config("ATOMIC_MODE", atomic_mode); + test_ucp_memheap::init(); +} - template void blocking_add(entity *e, size_t max_size, void *memheap_addr, - ucp_rkey_h rkey, std::string& expected_data); - - void unaligned_blocking_add64(entity *e, size_t max_size, void *memheap_addr, - ucp_rkey_h rkey, std::string& expected_data) - { - /* Test that unaligned addresses generate error */ - ucs_status_t status; - status = ucp_atomic_add64(e->ep(), 0, (uintptr_t)memheap_addr + 1, rkey); - EXPECT_EQ(UCS_ERR_INVALID_PARAM, status); - expected_data.clear(); +template +void test_ucp_atomic::blocking_add(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data) +{ + ucs_status_t status; + T add, prev; + + prev = *(T*)memheap_addr; + add = (T)rand() * (T)rand(); + + if (sizeof(T) == sizeof(uint32_t)) { + status = ucp_atomic_add32(e->ep(), add, (uintptr_t)memheap_addr, rkey); + } else if (sizeof(T) == sizeof(uint64_t)) { + status = ucp_atomic_add64(e->ep(), add, (uintptr_t)memheap_addr, rkey); + } else { + status = UCS_ERR_UNSUPPORTED; } + ASSERT_UCS_OK(status); - template - void blocking_fadd(entity *e, size_t max_size, void *memheap_addr, - ucp_rkey_h rkey, std::string& expected_data) - { - ucs_status_t status; - T add, prev, result; - - prev = *(T*)memheap_addr; - add = (T)rand() * (T)rand(); - - if (sizeof(T) == sizeof(uint32_t)) { - status = ucp_atomic_fadd32(e->ep(), add, (uintptr_t)memheap_addr, rkey, - (uint32_t*)(void*)&result); - } else if (sizeof(T) == sizeof(uint64_t)) { - status = ucp_atomic_fadd64(e->ep(), add, (uintptr_t)memheap_addr, rkey, - (uint64_t*)(void*)&result); - } else { - status = UCS_ERR_UNSUPPORTED; - } - ASSERT_UCS_OK(status); - - EXPECT_EQ(prev, result); - - expected_data.resize(sizeof(T)); - *(T*)&expected_data[0] = add + prev; + expected_data.resize(sizeof(T)); + *(T*)&expected_data[0] = add + prev; +} + +void test_ucp_atomic::unaligned_blocking_add64(entity *e, size_t max_size, + void *memheap_addr, ucp_rkey_h rkey, + std::string& expected_data) +{ + /* Test that unaligned addresses generate error */ + ucs_status_t status; + status = ucp_atomic_add64(e->ep(), 0, (uintptr_t)memheap_addr + 1, rkey); + EXPECT_EQ(UCS_ERR_INVALID_PARAM, status); + expected_data.clear(); +} + +template +void test_ucp_atomic::blocking_fadd(entity *e, size_t max_size, + void *memheap_addr, ucp_rkey_h rkey, + std::string& expected_data) +{ + ucs_status_t status; + T add, prev, result; + + prev = *(T*)memheap_addr; + add = (T)rand() * (T)rand(); + + if (sizeof(T) == sizeof(uint32_t)) { + status = ucp_atomic_fadd32(e->ep(), add, (uintptr_t)memheap_addr, rkey, + (uint32_t*)(void*)&result); + } else if (sizeof(T) == sizeof(uint64_t)) { + status = ucp_atomic_fadd64(e->ep(), add, (uintptr_t)memheap_addr, rkey, + (uint64_t*)(void*)&result); + } else { + status = UCS_ERR_UNSUPPORTED; } + ASSERT_UCS_OK(status); - template - void blocking_swap(entity *e, size_t max_size, void *memheap_addr, - ucp_rkey_h rkey, std::string& expected_data) - { - ucs_status_t status; - T swap, prev, result; - - prev = *(T*)memheap_addr; - swap = (T)rand() * (T)rand(); - - if (sizeof(T) == sizeof(uint32_t)) { - status = ucp_atomic_swap32(e->ep(), swap, (uintptr_t)memheap_addr, - rkey, (uint32_t*)(void*)&result); - } else if (sizeof(T) == sizeof(uint64_t)) { - status = ucp_atomic_swap64(e->ep(), swap, (uintptr_t)memheap_addr, - rkey, (uint64_t*)(void*)&result); - } else { - status = UCS_ERR_UNSUPPORTED; - } - ASSERT_UCS_OK(status); - - EXPECT_EQ(prev, result); - - expected_data.resize(sizeof(T)); - *(T*)&expected_data[0] = swap; + EXPECT_EQ(prev, result); + + expected_data.resize(sizeof(T)); + *(T*)&expected_data[0] = add + prev; +} + +template +void test_ucp_atomic::blocking_swap(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data) +{ + ucs_status_t status; + T swap, prev, result; + + prev = *(T*)memheap_addr; + swap = (T)rand() * (T)rand(); + + if (sizeof(T) == sizeof(uint32_t)) { + status = ucp_atomic_swap32(e->ep(), swap, (uintptr_t)memheap_addr, + rkey, (uint32_t*)(void*)&result); + } else if (sizeof(T) == sizeof(uint64_t)) { + status = ucp_atomic_swap64(e->ep(), swap, (uintptr_t)memheap_addr, + rkey, (uint64_t*)(void*)&result); + } else { + status = UCS_ERR_UNSUPPORTED; } + ASSERT_UCS_OK(status); + + EXPECT_EQ(prev, result); - template - void blocking_cswap(entity *e, size_t max_size, void *memheap_addr, - ucp_rkey_h rkey, std::string& expected_data) - { - ucs_status_t status; - T compare, swap, prev, result; - - prev = *(T*)memheap_addr; - if ((rand() % 2) == 0) { - compare = prev; /* success mode */ - } else { - compare = ~prev; /* fail mode */ - } - swap = (T)rand() * (T)rand(); - - if (sizeof(T) == sizeof(uint32_t)) { - status = ucp_atomic_cswap32(e->ep(), compare, swap, - (uintptr_t)memheap_addr, rkey, - (uint32_t*)(void*)&result); - } else if (sizeof(T) == sizeof(uint64_t)) { - status = ucp_atomic_cswap64(e->ep(), compare, swap, - (uintptr_t)memheap_addr, rkey, - (uint64_t*)(void*)&result); - } else { - status = UCS_ERR_UNSUPPORTED; - } - ASSERT_UCS_OK(status); - - EXPECT_EQ(prev, result); - - expected_data.resize(sizeof(T)); - if (compare == prev) { - *(T*)&expected_data[0] = swap; - } else { - *(T*)&expected_data[0] = prev; - } + expected_data.resize(sizeof(T)); + *(T*)&expected_data[0] = swap; +} + +template +void test_ucp_atomic::blocking_cswap(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data) +{ + ucs_status_t status; + T compare, swap, prev, result; + + prev = *(T*)memheap_addr; + if ((rand() % 2) == 0) { + compare = prev; /* success mode */ + } else { + compare = ~prev; /* fail mode */ + } + swap = (T)rand() * (T)rand(); + + if (sizeof(T) == sizeof(uint32_t)) { + status = ucp_atomic_cswap32(e->ep(), compare, swap, + (uintptr_t)memheap_addr, rkey, + (uint32_t*)(void*)&result); + } else if (sizeof(T) == sizeof(uint64_t)) { + status = ucp_atomic_cswap64(e->ep(), compare, swap, + (uintptr_t)memheap_addr, rkey, + (uint64_t*)(void*)&result); + } else { + status = UCS_ERR_UNSUPPORTED; } + ASSERT_UCS_OK(status); - template - void test(F f, bool malloc_allocate) { - test_blocking_xfer(static_cast(f), sizeof(T), - malloc_allocate, false); + EXPECT_EQ(prev, result); + + expected_data.resize(sizeof(T)); + if (compare == prev) { + *(T*)&expected_data[0] = swap; + } else { + *(T*)&expected_data[0] = prev; } +} + +template +void test_ucp_atomic::test(F f, bool malloc_allocate) { + test_blocking_xfer(static_cast(f), sizeof(T), + malloc_allocate, false); +} -}; class test_ucp_atomic32 : public test_ucp_atomic { public: diff --git a/test/gtest/ucp/test_ucp_atomic.h b/test/gtest/ucp/test_ucp_atomic.h new file mode 100644 index 00000000000..3433a5b5e08 --- /dev/null +++ b/test/gtest/ucp/test_ucp_atomic.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED. + * + * See file LICENSE for terms. + */ + +#ifndef TEST_UCP_ATOMIC_H_ +#define TEST_UCP_ATOMIC_H_ + +#include "test_ucp_memheap.h" + + +class test_ucp_atomic : public test_ucp_memheap { +public: + static std::vector enum_test_params(const ucp_params_t& ctx_params, + const std::string& name, + const std::string& test_case_name, + const std::string& tls); + + virtual void init(); + + template + void blocking_add(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data); + + template void blocking_add(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data); + + void unaligned_blocking_add64(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data); + + template + void blocking_fadd(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data); + + template + void blocking_swap(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data); + + template + void blocking_cswap(entity *e, size_t max_size, void *memheap_addr, + ucp_rkey_h rkey, std::string& expected_data); + + template + void test(F f, bool malloc_allocate); +}; + +#endif diff --git a/test/gtest/ucp/test_ucp_fence.cc b/test/gtest/ucp/test_ucp_fence.cc index cfe48aa9461..bb7f1ddd1df 100644 --- a/test/gtest/ucp/test_ucp_fence.cc +++ b/test/gtest/ucp/test_ucp_fence.cc @@ -4,9 +4,9 @@ * See file LICENSE for terms. */ -#include "ucp_test.h" +#include "test_ucp_atomic.h" -class test_ucp_fence : public ucp_test { +class test_ucp_fence : public test_ucp_atomic { public: typedef void (test_ucp_fence::* send_func_t)(entity *e, uint64_t *initial_buf, uint64_t *result_buf, void *memheap_addr, From fd6545796167fbc88b98272eae48722cf11850f0 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Wed, 14 Sep 2016 01:47:40 +0300 Subject: [PATCH 6/7] UCP: Fix atomic selection across multiple devices. + p2p transports can use only the devices which are selected for inbound atomic operations, otherwise the remote peer would not be able to connect. + use priority mechanism in atomic device selection. --- src/ucp/core/ucp_worker.c | 12 +++++++--- src/ucp/wireup/select.c | 50 +++++++++++++++++++++++++++++---------- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 41ea5ad24d8..dcf362a96ca 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -294,6 +294,7 @@ static void ucp_worker_init_device_atomics(ucp_worker_h worker) ucp_rsc_index_t md_index; uct_md_attr_t *md_attr; uint64_t supp_tls; + uint8_t priority, best_priority; iface_cap_flags = ucp_context_uct_atomic_iface_flags(context) | UCT_IFACE_FLAG_ATOMIC_DEVICE; @@ -306,6 +307,7 @@ static void ucp_worker_init_device_atomics(ucp_worker_h worker) supp_tls = 0; best_score = -1; best_rsc = NULL; + best_priority = 0; /* Select best interface for atomics device */ for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { @@ -321,11 +323,15 @@ static void ucp_worker_init_device_atomics(ucp_worker_h worker) } supp_tls |= UCS_BIT(rsc_index); + priority = iface_attr->priority; score = ucp_wireup_amo_score_func(md_attr, iface_attr, &dummy_iface_attr); - if (score > best_score) { - best_rsc = rsc; - best_score = score; + if ((score > best_score) || + ((score == best_score) && (priority > best_priority))) + { + best_rsc = rsc; + best_score = score; + best_priority = priority; } } diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 54f8c05d67d..205bbdb60c8 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -125,7 +125,7 @@ static int ucp_wireup_is_reachable(ucp_worker_h worker, ucp_rsc_index_t rsc_inde static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list, unsigned address_count, const ucp_wireup_criteria_t *criteria, - uint64_t remote_md_map, int show_error, + uint64_t tl_bitmap, uint64_t remote_md_map, int show_error, ucp_rsc_index_t *rsc_index_p, unsigned *dst_addr_index_p, double *score_p) { @@ -212,6 +212,16 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list continue; } + /* Check supplied tl bitmap */ + if (!(tl_bitmap & UCS_BIT(rsc_index))) { + ucs_trace(UCT_TL_RESOURCE_DESC_FMT " : disabled by tl_bitmap", + UCT_TL_RESOURCE_DESC_ARG(resource)); + snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - disabled for %s, ", + UCT_TL_RESOURCE_DESC_ARG(resource), criteria->title); + p += strlen(p); + continue; + } + reachable = 0; for (ae = address_list; ae < address_list + address_count; ++ae) { @@ -349,7 +359,7 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p, const ucp_wireup_criteria_t *criteria, - uint32_t usage) + uint64_t tl_bitmap, uint32_t usage) { ucp_wireup_criteria_t mem_criteria = *criteria; ucp_address_entry_t *address_list_copy; @@ -378,8 +388,8 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, mem_criteria.title = title; mem_criteria.remote_md_flags = UCT_MD_FLAG_REG; status = ucp_wireup_select_transport(ep, address_list_copy, address_count, - &mem_criteria, remote_md_map, 1, - &rsc_index, &addr_index, &score); + &mem_criteria, tl_bitmap, remote_md_map, + 1, &rsc_index, &addr_index, &score); if (status != UCS_OK) { goto out_free_address_list; } @@ -404,8 +414,8 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, while (address_count > 0) { status = ucp_wireup_select_transport(ep, address_list_copy, address_count, - &mem_criteria, remote_md_map, 0, - &rsc_index, &addr_index, &score); + &mem_criteria, tl_bitmap, remote_md_map, + 0, &rsc_index, &addr_index, &score); if ((status != UCS_OK) || (score <= reg_score)) { break; } @@ -462,7 +472,7 @@ static ucs_status_t ucp_wireup_add_rma_lanes(ucp_ep_h ep, unsigned address_count return ucp_wireup_add_memaccess_lanes(ep, address_count, address_list, lane_descs, num_lanes_p, &criteria, - UCP_WIREUP_LANE_USAGE_RMA); + -1, UCP_WIREUP_LANE_USAGE_RMA); } double ucp_wireup_amo_score_func(const uct_md_attr_t *md_attr, @@ -478,10 +488,13 @@ static ucs_status_t ucp_wireup_add_amo_lanes(ucp_ep_h ep, unsigned address_count ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p) { + ucp_worker_h worker = ep->worker; + ucp_context_h context = worker->context; ucp_wireup_criteria_t criteria; + ucp_rsc_index_t rsc_index; + uint64_t tl_bitmap; - criteria.remote_iface_flags = - ucp_context_uct_atomic_iface_flags(ep->worker->context); + criteria.remote_iface_flags = ucp_context_uct_atomic_iface_flags(context); if (criteria.remote_iface_flags == 0) { return UCS_OK; } @@ -493,9 +506,20 @@ static ucs_status_t ucp_wireup_add_amo_lanes(ucp_ep_h ep, unsigned address_count UCT_IFACE_FLAG_PENDING; criteria.calc_score = ucp_wireup_amo_score_func; + /* We can use only non-p2p resources or resources which are explicitly + * selected for atomics. Otherwise, the remote peer would not be able to + * connect back on p2p transport. + */ + tl_bitmap = worker->atomic_tls; + for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { + if (!ucp_worker_is_tl_p2p(worker, rsc_index)) { + tl_bitmap |= UCS_BIT(rsc_index); + } + } + return ucp_wireup_add_memaccess_lanes(ep, address_count, address_list, lane_descs, num_lanes_p, &criteria, - UCP_WIREUP_LANE_USAGE_AMO); + tl_bitmap, UCP_WIREUP_LANE_USAGE_AMO); } static double ucp_wireup_am_score_func(const uct_md_attr_t *md_attr, @@ -554,7 +578,7 @@ static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, unsigned address_count, } status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, - -1, 1, &rsc_index, &addr_index, &score); + -1, -1, 1, &rsc_index, &addr_index, &score); if (status != UCS_OK) { return status; } @@ -594,7 +618,7 @@ static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, unsigned address_count } status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, - -1, 0, &rsc_index, &addr_index, &score); + -1, -1, 0, &rsc_index, &addr_index, &score); if ((status == UCS_OK) && /* a temporary workaround to prevent the ugni uct from using rndv */ (strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") == NULL)) { @@ -783,6 +807,6 @@ ucs_status_t ucp_wireup_select_aux_transport(ucp_ep_h ep, { double score; return ucp_wireup_select_transport(ep, address_list, address_count, - &ucp_wireup_aux_criteria, -1, 1, + &ucp_wireup_aux_criteria, -1, -1, 1, rsc_index_p, addr_index_p, &score); } From ad5a6f5de69e4deee203dda4823170a656007445 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Wed, 14 Sep 2016 22:28:05 +0300 Subject: [PATCH 7/7] UCP/UCT: Rename host atomics to cpu atomics. --- src/tools/info/tl_info.c | 4 ++-- src/ucp/core/ucp_context.c | 6 +++--- src/ucp/core/ucp_context.h | 2 +- src/ucp/core/ucp_worker.c | 10 +++++----- src/uct/api/uct.h | 6 +++--- src/uct/sm/mm/mm_iface.c | 2 +- src/uct/sm/self/self_iface.c | 2 +- test/gtest/ucp/test_ucp_atomic.cc | 4 ++-- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/tools/info/tl_info.c b/src/tools/info/tl_info.c index 8fbdcfa44f5..04400c6fec5 100644 --- a/src/tools/info/tl_info.c +++ b/src/tools/info/tl_info.c @@ -22,8 +22,8 @@ if ((_cap_flags) & (UCT_IFACE_FLAG_##_name##32 | UCT_IFACE_FLAG_##_name##64)) { \ char *s = strduplower(#_name); \ char *domain = ""; \ - if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_HOST) { \ - domain = ", host"; \ + if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_CPU) { \ + domain = ", cpu"; \ } else if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_DEVICE) { \ domain = ", device"; \ } \ diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index f3016c39830..1110a881cf4 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -20,7 +20,7 @@ ucp_am_handler_t ucp_am_handlers[UCP_AM_ID_LAST] = {{0, NULL, NULL}}; static const char *ucp_atomic_modes[] = { - [UCP_ATOMIC_MODE_HOST] = "host", + [UCP_ATOMIC_MODE_CPU] = "cpu", [UCP_ATOMIC_MODE_DEVICE] = "device", [UCP_ATOMIC_MODE_LAST] = NULL, }; @@ -91,9 +91,9 @@ static ucs_config_field_t ucp_config_table[] = { {"ATOMIC_MODE", "device", "Atomic operations synchronization mode.\n" - " host - operations are atomic with respect to the host processor.\n" + " cpu - atomic operations are consistent with respect to the CPU.\n" " device - atomic operations are performed on one of the transport devices,\n" - " and there is no atomicity guarantee with respect to the host processor.", + " and there is guarantee of consistency with respect to the CPU.", ucs_offsetof(ucp_config_t, ctx.atomic_mode), UCS_CONFIG_TYPE_ENUM(ucp_atomic_modes)}, {"LOG_DATA", "0", diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 6dfb220cf7d..f9b9248f511 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -71,7 +71,7 @@ enum { * Atomic operations mode. */ typedef enum { - UCP_ATOMIC_MODE_HOST, /* Use host-based atomics */ + UCP_ATOMIC_MODE_CPU, /* Use CPU-based atomics */ UCP_ATOMIC_MODE_DEVICE, /* Use device-based atomics */ UCP_ATOMIC_MODE_LAST } ucp_atomic_mode_t; diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index dcf362a96ca..2fef97d9c11 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -269,15 +269,15 @@ static void ucp_worker_enable_atomic_tl(ucp_worker_h worker, const char *mode, worker->atomic_tls |= UCS_BIT(rsc_index); } -static void ucp_worker_init_host_atomics(ucp_worker_h worker) +static void ucp_worker_init_cpu_atomics(ucp_worker_h worker) { ucp_context_h context = worker->context; ucp_rsc_index_t rsc_index; /* Enable all interfaces which have host-based atomics */ for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { - if (worker->iface_attrs[rsc_index].cap.flags & UCT_IFACE_FLAG_ATOMIC_HOST) { - ucp_worker_enable_atomic_tl(worker, "host", rsc_index); + if (worker->iface_attrs[rsc_index].cap.flags & UCT_IFACE_FLAG_ATOMIC_CPU) { + ucp_worker_enable_atomic_tl(worker, "cpu", rsc_index); } } } @@ -360,8 +360,8 @@ static void ucp_worker_init_atomic_tls(ucp_worker_h worker) worker->atomic_tls = 0; if (context->config.features & (UCP_FEATURE_AMO32|UCP_FEATURE_AMO64)) { - if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_HOST) { - ucp_worker_init_host_atomics(worker); + if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_CPU) { + ucp_worker_init_cpu_atomics(worker); } else if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_DEVICE) { ucp_worker_init_device_atomics(worker); } diff --git a/src/uct/api/uct.h b/src/uct/api/uct.h index 739c81dcc5c..405cde4035d 100644 --- a/src/uct/api/uct.h +++ b/src/uct/api/uct.h @@ -173,9 +173,9 @@ enum { UCT_IFACE_FLAG_ATOMIC_CSWAP64 = UCS_BIT(23), /**< 64bit atomic compare-and-swap */ /* Atomic operations domain */ - UCT_IFACE_FLAG_ATOMIC_HOST = UCS_BIT(30), /**< Atomic communications are atomic - with respect to host operations. */ - UCT_IFACE_FLAG_ATOMIC_DEVICE = UCS_BIT(31), /**< Atomic communications are atomic + UCT_IFACE_FLAG_ATOMIC_CPU = UCS_BIT(30), /**< Atomic communications are consistent + with respect to CPU operations. */ + UCT_IFACE_FLAG_ATOMIC_DEVICE = UCS_BIT(31), /**< Atomic communications are consistent only with respect to other atomics on the same device. */ diff --git a/src/uct/sm/mm/mm_iface.c b/src/uct/sm/mm/mm_iface.c index 4af4e743de9..9409769ed12 100644 --- a/src/uct/sm/mm/mm_iface.c +++ b/src/uct/sm/mm/mm_iface.c @@ -103,7 +103,7 @@ static ucs_status_t uct_mm_iface_query(uct_iface_h tl_iface, UCT_IFACE_FLAG_ATOMIC_SWAP32 | UCT_IFACE_FLAG_ATOMIC_CSWAP64 | UCT_IFACE_FLAG_ATOMIC_CSWAP32 | - UCT_IFACE_FLAG_ATOMIC_HOST | + UCT_IFACE_FLAG_ATOMIC_CPU | UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | diff --git a/src/uct/sm/self/self_iface.c b/src/uct/sm/self/self_iface.c index 4f0997058a1..6f1c41615ac 100644 --- a/src/uct/sm/self/self_iface.c +++ b/src/uct/sm/self/self_iface.c @@ -45,7 +45,7 @@ static ucs_status_t uct_self_iface_query(uct_iface_h iface, uct_iface_attr_t *at UCT_IFACE_FLAG_ATOMIC_SWAP32 | UCT_IFACE_FLAG_ATOMIC_CSWAP64 | UCT_IFACE_FLAG_ATOMIC_CSWAP32 | - UCT_IFACE_FLAG_ATOMIC_HOST | + UCT_IFACE_FLAG_ATOMIC_CPU | UCT_IFACE_FLAG_PENDING | UCT_IFACE_FLAG_AM_CB_SYNC; diff --git a/test/gtest/ucp/test_ucp_atomic.cc b/test/gtest/ucp/test_ucp_atomic.cc index 3d1ce254943..353ab4f5877 100644 --- a/test/gtest/ucp/test_ucp_atomic.cc +++ b/test/gtest/ucp/test_ucp_atomic.cc @@ -17,7 +17,7 @@ test_ucp_atomic::enum_test_params(const ucp_params_t& ctx_params, { std::vector result; generate_test_params_variant(ctx_params, name, test_case_name, tls, - UCP_ATOMIC_MODE_HOST, result); + UCP_ATOMIC_MODE_CPU, result); generate_test_params_variant(ctx_params, name, test_case_name, tls, UCP_ATOMIC_MODE_DEVICE, result); return result; @@ -25,7 +25,7 @@ test_ucp_atomic::enum_test_params(const ucp_params_t& ctx_params, void test_ucp_atomic::init() { const char *atomic_mode = - (GetParam().variant == UCP_ATOMIC_MODE_HOST) ? "host" : + (GetParam().variant == UCP_ATOMIC_MODE_CPU) ? "cpu" : (GetParam().variant == UCP_ATOMIC_MODE_DEVICE) ? "device" : ""; modify_config("ATOMIC_MODE", atomic_mode);