diff --git a/src/ucp/wireup/address.c b/src/ucp/wireup/address.c index 43cdaca9de3e..d828a471af69 100644 --- a/src/ucp/wireup/address.c +++ b/src/ucp/wireup/address.c @@ -5,7 +5,7 @@ */ #include "address.h" -#include "wireup_ep.h" +#include "stub_ep.h" #include #include @@ -47,17 +47,12 @@ typedef struct { typedef struct { float overhead; float bandwidth; - float lat_ovh; + double lat_ovh; uint32_t prio_cap_flags; /* 8 lsb: prio, 24 msb - cap flags */ } ucp_address_packed_iface_attr_t; #define UCP_ADDRESS_FLAG_LAST 0x80 /* Last address in the list */ -#define UCP_ADDRESS_FLAG_EP_ADDR 0x40 /* Indicates that ep addr is packed - right after iface addr */ -#define UCP_ADDRESS_FLAG_LEN_MASK ~(UCP_ADDRESS_FLAG_EP_ADDR | \ - UCP_ADDRESS_FLAG_LAST) - #define UCP_ADDRESS_FLAG_EMPTY 0x80 /* Device without TL addresses */ #define UCP_ADDRESS_FLAG_MD_ALLOC 0x40 /* MD can register */ #define UCP_ADDRESS_FLAG_MD_REG 0x20 /* MD can allocate */ @@ -141,23 +136,22 @@ ucp_address_gather_devices(ucp_worker_h worker, uint64_t tl_bitmap, int has_ep, dev = ucp_address_get_device(context->tl_rscs[i].tl_rsc.dev_name, devices, &num_devices); - iface_attr = &worker->ifaces[i].attr; - - if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) && - !(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP)) { + iface_attr = &worker->iface_attrs[i]; + if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { + dev->tl_addrs_size += iface_attr->iface_addr_len; + } else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { + if (has_ep) { + dev->tl_addrs_size += iface_attr->ep_addr_len; + } else { + /* Empty address */ + } + } else { continue; } - dev->tl_addrs_size += iface_attr->iface_addr_len; - - if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) && has_ep) { - /* ep address and its length */ - dev->tl_addrs_size += 1 + iface_attr->ep_addr_len; - } - dev->tl_addrs_size += sizeof(uint16_t); /* tl name checksum */ dev->tl_addrs_size += sizeof(ucp_address_packed_iface_attr_t); /* iface attr */ - dev->tl_addrs_size += 1; /* iface address length */ + dev->tl_addrs_size += 1; /* address length */ dev->rsc_index = i; dev->dev_addr_len = iface_attr->device_addr_len; dev->tl_bitmap |= mask; @@ -213,7 +207,7 @@ ucp_address_pack_ep_address(ucp_ep_h ep, ucp_rsc_index_t tl_index, for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (ucp_ep_get_rsc_index(ep, lane) == tl_index) { /* - * If this is a wireup endpoint, it will return the underlying next_ep + * If this is a stub endpoint, it will return the underlying next_ep * address, and the length will be correct because the resource index * is of the next_ep. */ @@ -295,12 +289,10 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, ucp_rsc_index_t md_index; ucs_status_t status; ucp_rsc_index_t i; - size_t iface_addr_len; - size_t ep_addr_len; + size_t tl_addr_len; uint64_t md_flags; unsigned index; void *ptr; - uint8_t *iface_addr_len_ptr; ptr = buffer; index = 0; @@ -335,7 +327,7 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, ++ptr; /* Device address */ - status = uct_iface_get_device_address(worker->ifaces[dev->rsc_index].iface, + status = uct_iface_get_device_address(worker->ifaces[dev->rsc_index], (uct_device_addr_t*)ptr); if (status != UCS_OK) { return status; @@ -356,68 +348,57 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, ptr += sizeof(uint16_t); /* Transport information */ - ucp_address_pack_iface_attr(ptr, &worker->ifaces[i].attr, + ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i], worker->atomic_tls & UCS_BIT(i)); ucp_address_memchek(ptr, sizeof(ucp_address_packed_iface_attr_t), &context->tl_rscs[dev->rsc_index].tl_rsc); ptr += sizeof(ucp_address_packed_iface_attr_t); - iface_attr = &worker->ifaces[i].attr; - - if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) && - !(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP)) { - return UCS_ERR_INVALID_ADDR; + /* Transport address length */ + iface_attr = &worker->iface_attrs[i]; + if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { + tl_addr_len = iface_attr->iface_addr_len; + status = uct_iface_get_address(worker->ifaces[i], + (uct_iface_addr_t*)(ptr + 1)); + } else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { + if (ep == NULL) { + tl_addr_len = 0; + status = UCS_OK; + } else { + tl_addr_len = iface_attr->ep_addr_len; + status = ucp_address_pack_ep_address(ep, i, ptr + 1); + } + } else { + status = UCS_ERR_INVALID_ADDR; } - - /* Pack iface address */ - iface_addr_len = iface_attr->iface_addr_len; - ucs_assert(iface_addr_len < UCP_ADDRESS_FLAG_EP_ADDR); - - status = uct_iface_get_address(worker->ifaces[i].iface, - (uct_iface_addr_t*)(ptr + 1)); if (status != UCS_OK) { return status; } - ucp_address_memchek(ptr + 1, iface_addr_len, + + ucp_address_memchek(ptr + 1, tl_addr_len, &context->tl_rscs[dev->rsc_index].tl_rsc); - iface_addr_len_ptr = ptr; - *iface_addr_len_ptr = iface_addr_len | ((i == ucs_ilog2(dev->tl_bitmap)) ? - UCP_ADDRESS_FLAG_LAST : 0); - ptr += 1 + iface_addr_len; - - /* Pack ep address if present */ - if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) && - (ep != NULL)) { - *iface_addr_len_ptr |= UCP_ADDRESS_FLAG_EP_ADDR; - - ep_addr_len = iface_attr->ep_addr_len; - ucs_assert(ep_addr_len < UINT8_MAX); - *(uint8_t*)ptr = ep_addr_len; - - status = ucp_address_pack_ep_address(ep, i, ptr + 1); - if (status != UCS_OK) { - return status; - } - ucp_address_memchek(ptr + 1, ep_addr_len, - &context->tl_rscs[dev->rsc_index].tl_rsc); - ptr += 1 + ep_addr_len; - } /* Save the address index of this transport */ if (order != NULL) { order[ucs_count_one_bits(tl_bitmap & UCS_MASK(i))] = index; } + ucs_assert(tl_addr_len < UCP_ADDRESS_FLAG_LAST); + *(uint8_t*)ptr = tl_addr_len | ((i == ucs_ilog2(dev->tl_bitmap)) ? + UCP_ADDRESS_FLAG_LAST : 0); + ptr += 1 + tl_addr_len; + + ucs_trace("pack addr[%d] : "UCT_TL_RESOURCE_DESC_FMT " md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e ovh %e " "lat_ovh: %e dev_priority %d", index, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[i].tl_rsc), - md_flags, worker->ifaces[i].attr.cap.flags, - worker->ifaces[i].attr.bandwidth, - worker->ifaces[i].attr.overhead, - worker->ifaces[i].attr.latency.overhead, - worker->ifaces[i].attr.priority); + md_flags, worker->iface_attrs[i].cap.flags, + worker->iface_attrs[i].bandwidth, + worker->iface_attrs[i].overhead, + worker->iface_attrs[i].latency.overhead, + worker->iface_attrs[i].priority); ++index; } } @@ -485,12 +466,11 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p, const uct_device_addr_t *dev_addr; ucp_rsc_index_t md_index; unsigned address_count; - int last_dev, last_tl, ep_addr_present; + int last_dev, last_tl; int empty_dev; uint64_t md_flags; size_t dev_addr_len; - size_t iface_addr_len; - size_t ep_addr_len; + size_t tl_addr_len; uint8_t md_byte; const void *ptr; const void *aptr; @@ -526,19 +506,15 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p, ptr += sizeof(uint16_t); /* tl_name_csum */ ptr += sizeof(ucp_address_packed_iface_attr_t); /* iface attr */ - /* iface and ep address lengths */ - iface_addr_len = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LEN_MASK; - last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST; - ep_addr_present = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_EP_ADDR; - ptr += 1 + iface_addr_len; - - if (ep_addr_present) { - ep_addr_len = *(uint8_t*)ptr; - ptr += 1 + ep_addr_len; - } + /* tl address length */ + tl_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST; + last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST; + ++ptr; ++address_count; ucs_assert(address_count <= UCP_MAX_RESOURCES); + + ptr += tl_addr_len; } } while (!last_dev); @@ -587,24 +563,16 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p, ptr += sizeof(ucp_address_packed_iface_attr_t); /* tl address length */ - iface_addr_len = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LEN_MASK; - last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST; - ep_addr_present = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_EP_ADDR; + tl_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST; + last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST; ++ptr; - address->dev_addr = (dev_addr_len > 0) ? dev_addr : NULL; - address->md_index = md_index; - address->md_flags = md_flags; - address->iface_addr = (iface_addr_len > 0) ? ptr : NULL; - ptr += iface_addr_len; - - if (ep_addr_present) { - ep_addr_len = *(uint8_t*)ptr; - address->ep_addr = (ep_addr_len > 0) ? ptr + 1 : NULL; - ptr += 1 + ep_addr_len; - } else { - address->ep_addr = NULL; - } + address->dev_addr = (dev_addr_len > 0) ? dev_addr : NULL; + address->dev_addr_len = dev_addr_len; + address->md_index = md_index; + address->md_flags = md_flags; + address->tl_addr = (tl_addr_len > 0) ? ptr : NULL; + address->tl_addr_len = tl_addr_len; ucs_trace("unpack addr[%d] : md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e ovh %e " "lat_ovh %e dev_priority %d", @@ -614,6 +582,8 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p, address->iface_attr.lat_ovh, address->iface_attr.priority); ++address; + + ptr += tl_addr_len; } } while (!last_dev); diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 1f6ae4457b11..43098e623bf6 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -8,7 +8,6 @@ #include "address.h" #include -#include #include #include #include @@ -19,15 +18,13 @@ enum { UCP_WIREUP_LANE_USAGE_AM = UCS_BIT(0), UCP_WIREUP_LANE_USAGE_RMA = UCS_BIT(1), UCP_WIREUP_LANE_USAGE_AMO = UCS_BIT(2), - UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3), - UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4) + UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3) }; typedef struct { ucp_rsc_index_t rsc_index; unsigned addr_index; - ucp_lane_index_t proxy_lane; ucp_rsc_index_t dst_md_index; uint32_t usage; double rma_score; @@ -58,20 +55,13 @@ static const char *ucp_wireup_iface_flags[] = { [ucs_ilog2(UCT_IFACE_FLAG_ATOMIC_SWAP64)] = "64-bit atomic swap", [ucs_ilog2(UCT_IFACE_FLAG_ATOMIC_CSWAP32)] = "32-bit atomic compare-swap", [ucs_ilog2(UCT_IFACE_FLAG_ATOMIC_CSWAP64)] = "64-bit atomic compare-swap", - [ucs_ilog2(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)] = "peer failure handler", [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_IFACE)] = "connect to iface", [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_EP)] = "connect to ep", [ucs_ilog2(UCT_IFACE_FLAG_AM_DUP)] = "full reliability", - [ucs_ilog2(UCT_IFACE_FLAG_CB_SYNC)] = "sync callback", - [ucs_ilog2(UCT_IFACE_FLAG_CB_ASYNC)] = "async callback", - [ucs_ilog2(UCT_IFACE_FLAG_EVENT_SEND_COMP)] = "send completion event", - [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV_AM)] = "active message event", - [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV_SIG_AM)]= "signaled message event", - [ucs_ilog2(UCT_IFACE_FLAG_PENDING)] = "pending", - [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_SHORT)] = "tag eager short", - [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)] = "tag eager bcopy", - [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY)] = "tag eager zcopy", - [ucs_ilog2(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY)] = "tag rndv zcopy" + [ucs_ilog2(UCT_IFACE_FLAG_AM_CB_SYNC)] = "sync am callback", + [ucs_ilog2(UCT_IFACE_FLAG_AM_CB_ASYNC)] = "async am callback", + [ucs_ilog2(UCT_IFACE_FLAG_WAKEUP)] = "wakeup", + [ucs_ilog2(UCT_IFACE_FLAG_PENDING)] = "pending" }; static double ucp_wireup_aux_score_func(ucp_context_h context, @@ -79,6 +69,19 @@ static double ucp_wireup_aux_score_func(ucp_context_h context, const uct_iface_attr_t *iface_attr, const ucp_address_iface_attr_t *remote_iface_attr); +static ucp_wireup_criteria_t ucp_wireup_aux_criteria = { + .title = "auxiliary", + .local_md_flags = 0, + .remote_md_flags = 0, + .local_iface_flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE | + UCT_IFACE_FLAG_AM_BCOPY | + UCT_IFACE_FLAG_PENDING, + .remote_iface_flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE | + UCT_IFACE_FLAG_AM_BCOPY | + UCT_IFACE_FLAG_AM_CB_ASYNC, + .calc_score = ucp_wireup_aux_score_func +}; + static const char * ucp_wireup_get_missing_flag_desc(uint64_t flags, uint64_t required_flags, const char ** flag_descs) @@ -115,8 +118,7 @@ static int ucp_wireup_is_reachable(ucp_worker_h worker, ucp_rsc_index_t rsc_inde { ucp_context_h context = worker->context; return (context->tl_rscs[rsc_index].tl_name_csum == ae->tl_name_csum) && - uct_iface_is_reachable(worker->ifaces[rsc_index].iface, ae->dev_addr, - ae->iface_addr); + uct_iface_is_reachable(worker->ifaces[rsc_index], ae->dev_addr, ae->iface_addr); } /** @@ -144,7 +146,6 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list int reachable; int found; uint8_t priority, best_score_priority; - float epsilon; /* a small value to overcome float imprecision */ found = 0; best_score = 0.0; @@ -188,7 +189,7 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list } if (!addr_index_map) { - snprintf(p, endp - p, "%s ", ucs_status_string(UCS_ERR_UNSUPPORTED)); + snprintf(p, endp - p, "not supported by peer "); p += strlen(p); } @@ -198,7 +199,7 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list * has a reachable tl on the remote peer */ for (rsc_index = 0; addr_index_map && (rsc_index < context->num_tls); ++rsc_index) { resource = &context->tl_rscs[rsc_index].tl_rsc; - iface_attr = &worker->ifaces[rsc_index].attr; + iface_attr = &worker->iface_attrs[rsc_index]; md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; /* Check that local md and interface satisfy the criteria */ @@ -249,9 +250,8 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list /* First comparing score, if score equals to current best score, * comparing priority with the priority of best score */ - epsilon = (score + best_score) * (1e-6); - if (!found || (score > (best_score + epsilon)) || - ((fabs(score - best_score) < epsilon) && (priority > best_score_priority))) { + if (!found || (score > best_score) || + ((score == best_score) && (priority > best_score_priority))) { *rsc_index_p = rsc_index; *dst_addr_index_p = ae - address_list; *score_p = score; @@ -265,9 +265,8 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list * debug message. */ if (!reachable) { - snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - %s, ", - UCT_TL_RESOURCE_DESC_ARG(resource), - ucs_status_string(UCS_ERR_UNREACHABLE)); + snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - cannot reach peer, ", + UCT_TL_RESOURCE_DESC_ARG(resource)); p += strlen(p); } } @@ -284,10 +283,9 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list return UCS_ERR_UNREACHABLE; } - ucs_trace("ep %p: selected for %s: " UCT_TL_RESOURCE_DESC_FMT " md[%d]" + ucs_trace("ep %p: selected for %s: " UCT_TL_RESOURCE_DESC_FMT " -> '%s' address[%d],md[%d] score %.2f", ep, criteria->title, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[*rsc_index_p].tl_rsc), - context->tl_rscs[*rsc_index_p].md_index, ucp_ep_peer_name(ep), *dst_addr_index_p, address_list[*dst_addr_index_p].md_index, best_score); return UCS_OK; @@ -305,58 +303,31 @@ static UCS_F_NOINLINE void ucp_wireup_add_lane_desc(ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p, ucp_rsc_index_t rsc_index, unsigned addr_index, ucp_rsc_index_t dst_md_index, - double score, uint32_t usage, int is_proxy) + double score, uint32_t usage) { ucp_wireup_lane_desc_t *lane_desc; - ucp_lane_index_t lane, proxy_lane; - int proxy_changed; - /* Add a new lane, but try to reuse already added lanes which are selected - * on the same transport resources. - */ - proxy_changed = 0; for (lane_desc = lane_descs; lane_desc < lane_descs + (*num_lanes_p); ++lane_desc) { if ((lane_desc->rsc_index == rsc_index) && (lane_desc->addr_index == addr_index)) { - lane = lane_desc - lane_descs; ucs_assertv_always(dst_md_index == lane_desc->dst_md_index, "lane[%d].dst_md_index=%d, dst_md_index=%d", - lane, lane_desc->dst_md_index, dst_md_index); + (int)(lane_desc - lane_descs), lane_desc->dst_md_index, + dst_md_index); ucs_assertv_always(!(lane_desc->usage & usage), "lane[%d]=0x%x |= 0x%x", - lane, lane_desc->usage, usage); - if (is_proxy && (lane_desc->proxy_lane == UCP_NULL_LANE)) { - /* New lane is a proxy, and found existing non-proxy lane with - * same resource. So that lane should be used by the proxy. - */ - proxy_lane = lane; - goto out_add_lane; - } else if (!is_proxy && (lane_desc->proxy_lane == lane)) { - /* New lane is not a proxy, but found existing proxy lane which - * could use the new lane. It also means we should be able to - * add our new lane. - */ - lane_desc->proxy_lane = *num_lanes_p; - proxy_changed = 1; - } else if (!is_proxy && (lane_desc->proxy_lane == UCP_NULL_LANE)) { - /* Found non-proxy lane with same resource - don't add */ - ucs_assert_always(!proxy_changed); - lane_desc->usage |= usage; - goto out_update_score; - } + (int)(lane_desc - lane_descs), lane_desc->usage, + usage); + lane_desc->usage |= usage; + goto out_update_score; } } - /* If a proxy cannot find other lane with same resource, proxy to self */ - proxy_lane = is_proxy ? (*num_lanes_p) : UCP_NULL_LANE; - -out_add_lane: lane_desc = &lane_descs[*num_lanes_p]; ++(*num_lanes_p); lane_desc->rsc_index = rsc_index; lane_desc->addr_index = addr_index; - lane_desc->proxy_lane = proxy_lane; lane_desc->dst_md_index = dst_md_index; lane_desc->usage = usage; lane_desc->rma_score = 0.0; @@ -443,7 +414,7 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, /* Add to the list of lanes and remove all occurrences of the remote md * from the address list, to avoid selecting the same remote md again.*/ ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, - dst_md_index, score, usage, 0); + dst_md_index, score, usage); remote_md_map &= ~UCS_BIT(dst_md_index); /* Select additional transports which can access allocated memory, but only @@ -466,7 +437,7 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, /* Add lane description and remove all occurrences of the remote md */ dst_md_index = address_list_copy[addr_index].md_index; ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, - dst_md_index, score, usage, 0); + dst_md_index, score, usage); remote_md_map &= ~UCS_BIT(dst_md_index); } @@ -494,34 +465,7 @@ static double ucp_wireup_rma_score_func(ucp_context_h context, (4096.0 / ucs_min(iface_attr->bandwidth, remote_iface_attr->bandwidth))); } -static void ucp_wireup_fill_ep_params_criteria(ucp_wireup_criteria_t *criteria, - const ucp_ep_params_t *params) -{ - if ((params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) && - (params->err_mode == UCP_ERR_HANDLING_MODE_PEER)) { - criteria->local_iface_flags |= UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE; - } -} - -static void ucp_wireup_fill_aux_criteria(ucp_wireup_criteria_t *criteria, - const ucp_ep_params_t *params) -{ - criteria->title = "auxiliary"; - criteria->local_md_flags = 0; - criteria->remote_md_flags = 0; - criteria->local_iface_flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE | - UCT_IFACE_FLAG_AM_BCOPY | - UCT_IFACE_FLAG_PENDING; - criteria->remote_iface_flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE | - UCT_IFACE_FLAG_AM_BCOPY | - UCT_IFACE_FLAG_CB_ASYNC; - criteria->calc_score = ucp_wireup_aux_score_func; - - ucp_wireup_fill_ep_params_criteria(criteria, params); -} - -static ucs_status_t ucp_wireup_add_rma_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, - unsigned address_count, +static ucs_status_t ucp_wireup_add_rma_lanes(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p) @@ -541,7 +485,6 @@ static ucs_status_t ucp_wireup_add_rma_lanes(ucp_ep_h ep, const ucp_ep_params_t criteria.local_iface_flags = criteria.remote_iface_flags | UCT_IFACE_FLAG_PENDING; criteria.calc_score = ucp_wireup_rma_score_func; - ucp_wireup_fill_ep_params_criteria(&criteria, params); return ucp_wireup_add_memaccess_lanes(ep, address_count, address_list, lane_descs, num_lanes_p, &criteria, @@ -558,8 +501,7 @@ double ucp_wireup_amo_score_func(ucp_context_h context, iface_attr->overhead); } -static ucs_status_t ucp_wireup_add_amo_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, - unsigned address_count, +static ucs_status_t ucp_wireup_add_amo_lanes(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p) @@ -581,7 +523,6 @@ static ucs_status_t ucp_wireup_add_amo_lanes(ucp_ep_h ep, const ucp_ep_params_t criteria.local_iface_flags = criteria.remote_iface_flags | UCT_IFACE_FLAG_PENDING; criteria.calc_score = ucp_wireup_amo_score_func; - ucp_wireup_fill_ep_params_criteria(&criteria, params); /* We can use only non-p2p resources or resources which are explicitly * selected for atomics. Otherwise, the remote peer would not be able to @@ -625,24 +566,21 @@ static double ucp_wireup_rndv_score_func(ucp_context_h context, (UCP_WIREUP_RNDV_TEST_MSG_SIZE * md_attr->reg_cost.growth)); } -static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, const ucp_ep_params_t *params, - unsigned address_count, +static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p) { ucp_wireup_criteria_t criteria; - uint64_t remote_cap_flags; ucp_rsc_index_t rsc_index; ucp_lane_index_t lane; ucs_status_t status; unsigned addr_index; - int is_proxy; double score; int need_am; /* Check if we need active messages, for wireup */ - if (!(ucp_ep_get_context_features(ep) & (UCP_FEATURE_TAG | UCP_FEATURE_STREAM))) { + if (!(ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG)) { need_am = 0; for (lane = 0; lane < *num_lanes_p; ++lane) { need_am = need_am || ucp_worker_is_tl_p2p(ep->worker, @@ -658,14 +596,13 @@ static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, const ucp_ep_params_t *p criteria.local_md_flags = 0; criteria.remote_md_flags = 0; criteria.remote_iface_flags = UCT_IFACE_FLAG_AM_BCOPY | - UCT_IFACE_FLAG_CB_SYNC; + UCT_IFACE_FLAG_AM_CB_SYNC; criteria.local_iface_flags = UCT_IFACE_FLAG_AM_BCOPY; criteria.calc_score = ucp_wireup_am_score_func; - ucp_wireup_fill_ep_params_criteria(&criteria, params); if (ucs_test_all_flags(ucp_ep_get_context_features(ep), UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP)) { - criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS; + criteria.remote_iface_flags |= UCT_IFACE_FLAG_WAKEUP; } status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, @@ -674,25 +611,13 @@ static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, const ucp_ep_params_t *p return status; } - /* If the remote side is not p2p and has only signaled-am wakeup, it may - * deactivate its interface and wait for signaled active message to wake up. - * Use a proxy lane which would send the first active message as signaled to - * make sure the remote interface will indeed wake up. - */ - remote_cap_flags = address_list[addr_index].iface_attr.cap_flags; - is_proxy = !ucp_worker_is_tl_p2p(ep->worker, rsc_index) && - ((remote_cap_flags & UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS) == - UCT_IFACE_FLAG_EVENT_RECV_SIG_AM); - ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, address_list[addr_index].md_index, score, - UCP_WIREUP_LANE_USAGE_AM, is_proxy); + UCP_WIREUP_LANE_USAGE_AM); return UCS_OK; } -static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, - const ucp_ep_params_t *params, - unsigned address_count, +static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t *num_lanes_p) @@ -715,10 +640,9 @@ static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, UCT_IFACE_FLAG_PENDING; criteria.local_iface_flags = UCT_IFACE_FLAG_GET_ZCOPY; criteria.calc_score = ucp_wireup_rndv_score_func; - ucp_wireup_fill_ep_params_criteria(&criteria, params); if (ucs_test_all_flags(ucp_ep_get_context_features(ep), UCP_FEATURE_WAKEUP)) { - criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS; + criteria.remote_iface_flags |= UCT_IFACE_FLAG_WAKEUP; } status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, @@ -728,62 +652,7 @@ static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, (strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") == NULL)) { ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, address_list[addr_index].md_index, score, - UCP_WIREUP_LANE_USAGE_RNDV, 0); - } - - return UCS_OK; -} - -/* Lane for transport offloaded tag interface */ -static ucs_status_t ucp_wireup_add_tag_lane(ucp_ep_h ep, unsigned address_count, - const ucp_address_entry_t *address_list, - ucp_wireup_lane_desc_t *lane_descs, - ucp_lane_index_t *num_lanes_p, - ucp_err_handling_mode_t err_mode) -{ - ucp_wireup_criteria_t criteria; - ucp_rsc_index_t rsc_index; - ucs_status_t status; - unsigned addr_index; - double score; - - if (!(ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) || - !ep->worker->context->config.ext.tm_offload || - ucs_queue_is_empty(&ep->worker->context->tm.offload.ifaces) || - /* TODO: remove check below when UCP_ERR_HANDLING_MODE_PEER supports - * RNDV-protocol or HW TM supports fragmented protocols - */ - err_mode == UCP_ERR_HANDLING_MODE_PEER) - { - return UCS_OK; - } - - criteria.title = "tag_offload"; - criteria.local_md_flags = UCT_MD_FLAG_REG; /* needed for posting tags to HW */ - criteria.remote_md_flags = UCT_MD_FLAG_REG; /* needed for posting tags to HW */ - criteria.remote_iface_flags = UCT_IFACE_FLAG_TAG_EAGER_BCOPY | - UCT_IFACE_FLAG_TAG_RNDV_ZCOPY | - UCT_IFACE_FLAG_GET_ZCOPY | - UCT_IFACE_FLAG_PENDING; - criteria.local_iface_flags = UCT_IFACE_FLAG_TAG_EAGER_BCOPY | - UCT_IFACE_FLAG_TAG_RNDV_ZCOPY | - UCT_IFACE_FLAG_GET_ZCOPY | - UCT_IFACE_FLAG_PENDING; - - /* Use RMA score func for now (to target mid size messages). - * TODO: have to align to TM_THRESH value. */ - criteria.calc_score = ucp_wireup_rma_score_func; - - if (ucs_test_all_flags(ucp_ep_get_context_features(ep), UCP_FEATURE_WAKEUP)) { - criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS; - } - - status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, - -1, -1, 0, &rsc_index, &addr_index, &score); - if (status == UCS_OK) { - ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, - address_list[addr_index].md_index, score, - UCP_WIREUP_LANE_USAGE_TAG, 0); + UCP_WIREUP_LANE_USAGE_RNDV); } return UCS_OK; @@ -791,20 +660,17 @@ static ucs_status_t ucp_wireup_add_tag_lane(ucp_ep_h ep, unsigned address_count, static ucp_lane_index_t ucp_wireup_select_wireup_msg_lane(ucp_worker_h worker, - const ucp_ep_params_t *ep_params, const ucp_address_entry_t *address_list, const ucp_wireup_lane_desc_t *lane_descs, ucp_lane_index_t num_lanes) { ucp_context_h context = worker->context; - ucp_lane_index_t p2p_lane = UCP_NULL_LANE; + ucp_lane_index_t p2p_lane = UCP_NULL_RESOURCE; uct_tl_resource_desc_t *resource; - ucp_wireup_criteria_t criteria; ucp_rsc_index_t rsc_index; ucp_lane_index_t lane; unsigned addr_index; - ucp_wireup_fill_aux_criteria(&criteria, ep_params); for (lane = 0; lane < num_lanes; ++lane) { rsc_index = lane_descs[lane].rsc_index; addr_index = lane_descs[lane].addr_index; @@ -813,12 +679,14 @@ ucp_wireup_select_wireup_msg_lane(ucp_worker_h worker, /* if the current lane satisfies the wireup criteria, choose it for wireup. * if it doesn't take a lane with a p2p transport */ if (ucp_wireup_check_flags(resource, - worker->ifaces[rsc_index].attr.cap.flags, - criteria.local_iface_flags, criteria.title, + worker->iface_attrs[rsc_index].cap.flags, + ucp_wireup_aux_criteria.local_iface_flags, + ucp_wireup_aux_criteria.title, ucp_wireup_iface_flags, NULL, 0) && ucp_wireup_check_flags(resource, address_list[addr_index].iface_attr.cap_flags, - criteria.remote_iface_flags, criteria.title, + ucp_wireup_aux_criteria.remote_iface_flags, + ucp_wireup_aux_criteria.title, ucp_wireup_iface_flags, NULL, 0)) { return lane; @@ -850,8 +718,7 @@ ucp_wireup_get_reachable_mds(ucp_worker_h worker, unsigned address_count, return reachable_mds; } -ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, - unsigned address_count, +ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, uint8_t *addr_indices, ucp_ep_config_key_t *key) @@ -864,41 +731,30 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, memset(lane_descs, 0, sizeof(lane_descs)); ucp_ep_config_key_reset(key); - if (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) { - key->err_mode = params->err_mode; - } - - status = ucp_wireup_add_rma_lanes(ep, params, address_count, address_list, + status = ucp_wireup_add_rma_lanes(ep, address_count, address_list, lane_descs, &key->num_lanes); if (status != UCS_OK) { return status; } - status = ucp_wireup_add_amo_lanes(ep, params, address_count, address_list, + status = ucp_wireup_add_amo_lanes(ep, address_count, address_list, lane_descs, &key->num_lanes); if (status != UCS_OK) { return status; } - status = ucp_wireup_add_am_lane(ep, params, address_count, address_list, + status = ucp_wireup_add_am_lane(ep, address_count, address_list, lane_descs, &key->num_lanes); if (status != UCS_OK) { return status; } - status = ucp_wireup_add_rndv_lane(ep, params, address_count, address_list, + status = ucp_wireup_add_rndv_lane(ep, address_count, address_list, lane_descs, &key->num_lanes); if (status != UCS_OK) { return status; } - status = ucp_wireup_add_tag_lane(ep, address_count, address_list, - lane_descs, &key->num_lanes, - key->err_mode); - if (status != UCS_OK) { - return status; - } - /* User should not create endpoints unless requested communication features */ if (key->num_lanes == 0) { ucs_error("No transports selected to %s (features: 0x%lx)", @@ -914,7 +770,6 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, for (lane = 0; lane < key->num_lanes; ++lane) { ucs_assert(lane_descs[lane].usage != 0); key->lanes[lane].rsc_index = lane_descs[lane].rsc_index; - key->lanes[lane].proxy_lane = lane_descs[lane].proxy_lane; key->lanes[lane].dst_md_index = lane_descs[lane].dst_md_index; addr_indices[lane] = lane_descs[lane].addr_index; @@ -932,10 +787,6 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_AMO) { key->amo_lanes[lane] = lane; } - if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_TAG) { - ucs_assert(key->tag_lane == UCP_NULL_LANE); - key->tag_lane = lane; - } } /* Sort RMA and AMO lanes according to score */ @@ -949,10 +800,8 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, address_list); /* Select lane for wireup messages */ - key->wireup_lane = ucp_wireup_select_wireup_msg_lane(worker, params, - address_list, - lane_descs, - key->num_lanes); + key->wireup_lane = ucp_wireup_select_wireup_msg_lane(worker, address_list, + lane_descs, key->num_lanes); return UCS_OK; } @@ -964,21 +813,18 @@ static double ucp_wireup_aux_score_func(ucp_context_h context, { /* best end-to-end latency and larger bcopy size */ return (1e-3 / (ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) + - iface_attr->overhead + remote_iface_attr->overhead)); + iface_attr->overhead + remote_iface_attr->overhead)) + + (1e3 * ucs_max(iface_attr->cap.am.max_bcopy, iface_attr->cap.am.max_short)); } ucs_status_t ucp_wireup_select_aux_transport(ucp_ep_h ep, - const ucp_ep_params_t *params, const ucp_address_entry_t *address_list, unsigned address_count, ucp_rsc_index_t *rsc_index_p, unsigned *addr_index_p) { - ucp_wireup_criteria_t criteria; double score; - - ucp_wireup_fill_aux_criteria(&criteria, params); return ucp_wireup_select_transport(ep, address_list, address_count, - &criteria, -1, -1, 1, rsc_index_p, - addr_index_p, &score); + &ucp_wireup_aux_criteria, -1, -1, 1, + rsc_index_p, addr_index_p, &score); }