Skip to content

Commit

Permalink
UCP: Add the latency.overhead to the passed address.
Browse files Browse the repository at this point in the history
- Add the lanency.overhead to the passed address so that each rank can
  see the same values when selecting a lane - since this value maybe
  different for different ranks.

- Consider the remote peer's bandwidth in the rndv score function - this
  will allow support for cases where different ranks have different
  speeds on their HCAs - heterogeneous fabric.

- enhance the logging for pack/unpack address - include the priority of
  the device and the lantency overhead.

fixes openucx#1534

(cherry picked from commit eb7fd1b)
  • Loading branch information
alinask authored and boehms committed Oct 17, 2017
1 parent 4f5ee7b commit b9a886a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 310 deletions.
158 changes: 64 additions & 94 deletions src/ucp/wireup/address.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

#include "address.h"
#include "wireup_ep.h"
#include "stub_ep.h"

#include <ucp/core/ucp_worker.h>
#include <ucp/core/ucp_ep.inl>
Expand Down Expand Up @@ -47,17 +47,12 @@ typedef struct {
typedef struct {
float overhead;
float bandwidth;
float lat_ovh;
double lat_ovh;
uint32_t prio_cap_flags; /* 8 lsb: prio, 24 msb - cap flags */
} ucp_address_packed_iface_attr_t;


#define UCP_ADDRESS_FLAG_LAST 0x80 /* Last address in the list */
#define UCP_ADDRESS_FLAG_EP_ADDR 0x40 /* Indicates that ep addr is packed
right after iface addr */
#define UCP_ADDRESS_FLAG_LEN_MASK ~(UCP_ADDRESS_FLAG_EP_ADDR | \
UCP_ADDRESS_FLAG_LAST)

#define UCP_ADDRESS_FLAG_EMPTY 0x80 /* Device without TL addresses */
#define UCP_ADDRESS_FLAG_MD_ALLOC 0x40 /* MD can register */
#define UCP_ADDRESS_FLAG_MD_REG 0x20 /* MD can allocate */
Expand Down Expand Up @@ -141,23 +136,22 @@ ucp_address_gather_devices(ucp_worker_h worker, uint64_t tl_bitmap, int has_ep,
dev = ucp_address_get_device(context->tl_rscs[i].tl_rsc.dev_name,
devices, &num_devices);

iface_attr = &worker->ifaces[i].attr;

if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) &&
!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP)) {
iface_attr = &worker->iface_attrs[i];
if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
dev->tl_addrs_size += iface_attr->iface_addr_len;
} else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
if (has_ep) {
dev->tl_addrs_size += iface_attr->ep_addr_len;
} else {
/* Empty address */
}
} else {
continue;
}

dev->tl_addrs_size += iface_attr->iface_addr_len;

if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) && has_ep) {
/* ep address and its length */
dev->tl_addrs_size += 1 + iface_attr->ep_addr_len;
}

dev->tl_addrs_size += sizeof(uint16_t); /* tl name checksum */
dev->tl_addrs_size += sizeof(ucp_address_packed_iface_attr_t); /* iface attr */
dev->tl_addrs_size += 1; /* iface address length */
dev->tl_addrs_size += 1; /* address length */
dev->rsc_index = i;
dev->dev_addr_len = iface_attr->device_addr_len;
dev->tl_bitmap |= mask;
Expand Down Expand Up @@ -213,7 +207,7 @@ ucp_address_pack_ep_address(ucp_ep_h ep, ucp_rsc_index_t tl_index,
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (ucp_ep_get_rsc_index(ep, lane) == tl_index) {
/*
* If this is a wireup endpoint, it will return the underlying next_ep
* If this is a stub endpoint, it will return the underlying next_ep
* address, and the length will be correct because the resource index
* is of the next_ep.
*/
Expand Down Expand Up @@ -295,12 +289,10 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep,
ucp_rsc_index_t md_index;
ucs_status_t status;
ucp_rsc_index_t i;
size_t iface_addr_len;
size_t ep_addr_len;
size_t tl_addr_len;
uint64_t md_flags;
unsigned index;
void *ptr;
uint8_t *iface_addr_len_ptr;

ptr = buffer;
index = 0;
Expand Down Expand Up @@ -335,7 +327,7 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep,
++ptr;

/* Device address */
status = uct_iface_get_device_address(worker->ifaces[dev->rsc_index].iface,
status = uct_iface_get_device_address(worker->ifaces[dev->rsc_index],
(uct_device_addr_t*)ptr);
if (status != UCS_OK) {
return status;
Expand All @@ -356,68 +348,57 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep,
ptr += sizeof(uint16_t);

/* Transport information */
ucp_address_pack_iface_attr(ptr, &worker->ifaces[i].attr,
ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i],
worker->atomic_tls & UCS_BIT(i));
ucp_address_memchek(ptr, sizeof(ucp_address_packed_iface_attr_t),
&context->tl_rscs[dev->rsc_index].tl_rsc);
ptr += sizeof(ucp_address_packed_iface_attr_t);

iface_attr = &worker->ifaces[i].attr;

if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) &&
!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP)) {
return UCS_ERR_INVALID_ADDR;
/* Transport address length */
iface_attr = &worker->iface_attrs[i];
if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
tl_addr_len = iface_attr->iface_addr_len;
status = uct_iface_get_address(worker->ifaces[i],
(uct_iface_addr_t*)(ptr + 1));
} else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
if (ep == NULL) {
tl_addr_len = 0;
status = UCS_OK;
} else {
tl_addr_len = iface_attr->ep_addr_len;
status = ucp_address_pack_ep_address(ep, i, ptr + 1);
}
} else {
status = UCS_ERR_INVALID_ADDR;
}

/* Pack iface address */
iface_addr_len = iface_attr->iface_addr_len;
ucs_assert(iface_addr_len < UCP_ADDRESS_FLAG_EP_ADDR);

status = uct_iface_get_address(worker->ifaces[i].iface,
(uct_iface_addr_t*)(ptr + 1));
if (status != UCS_OK) {
return status;
}
ucp_address_memchek(ptr + 1, iface_addr_len,

ucp_address_memchek(ptr + 1, tl_addr_len,
&context->tl_rscs[dev->rsc_index].tl_rsc);
iface_addr_len_ptr = ptr;
*iface_addr_len_ptr = iface_addr_len | ((i == ucs_ilog2(dev->tl_bitmap)) ?
UCP_ADDRESS_FLAG_LAST : 0);
ptr += 1 + iface_addr_len;

/* Pack ep address if present */
if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) &&
(ep != NULL)) {
*iface_addr_len_ptr |= UCP_ADDRESS_FLAG_EP_ADDR;

ep_addr_len = iface_attr->ep_addr_len;
ucs_assert(ep_addr_len < UINT8_MAX);
*(uint8_t*)ptr = ep_addr_len;

status = ucp_address_pack_ep_address(ep, i, ptr + 1);
if (status != UCS_OK) {
return status;
}
ucp_address_memchek(ptr + 1, ep_addr_len,
&context->tl_rscs[dev->rsc_index].tl_rsc);
ptr += 1 + ep_addr_len;
}

/* Save the address index of this transport */
if (order != NULL) {
order[ucs_count_one_bits(tl_bitmap & UCS_MASK(i))] = index;
}

ucs_assert(tl_addr_len < UCP_ADDRESS_FLAG_LAST);
*(uint8_t*)ptr = tl_addr_len | ((i == ucs_ilog2(dev->tl_bitmap)) ?
UCP_ADDRESS_FLAG_LAST : 0);
ptr += 1 + tl_addr_len;


ucs_trace("pack addr[%d] : "UCT_TL_RESOURCE_DESC_FMT
" md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e ovh %e "
"lat_ovh: %e dev_priority %d",
index,
UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[i].tl_rsc),
md_flags, worker->ifaces[i].attr.cap.flags,
worker->ifaces[i].attr.bandwidth,
worker->ifaces[i].attr.overhead,
worker->ifaces[i].attr.latency.overhead,
worker->ifaces[i].attr.priority);
md_flags, worker->iface_attrs[i].cap.flags,
worker->iface_attrs[i].bandwidth,
worker->iface_attrs[i].overhead,
worker->iface_attrs[i].latency.overhead,
worker->iface_attrs[i].priority);
++index;
}
}
Expand Down Expand Up @@ -485,12 +466,11 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p,
const uct_device_addr_t *dev_addr;
ucp_rsc_index_t md_index;
unsigned address_count;
int last_dev, last_tl, ep_addr_present;
int last_dev, last_tl;
int empty_dev;
uint64_t md_flags;
size_t dev_addr_len;
size_t iface_addr_len;
size_t ep_addr_len;
size_t tl_addr_len;
uint8_t md_byte;
const void *ptr;
const void *aptr;
Expand Down Expand Up @@ -526,19 +506,15 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p,
ptr += sizeof(uint16_t); /* tl_name_csum */
ptr += sizeof(ucp_address_packed_iface_attr_t); /* iface attr */

/* iface and ep address lengths */
iface_addr_len = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LEN_MASK;
last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST;
ep_addr_present = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_EP_ADDR;
ptr += 1 + iface_addr_len;

if (ep_addr_present) {
ep_addr_len = *(uint8_t*)ptr;
ptr += 1 + ep_addr_len;
}
/* tl address length */
tl_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST;
last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST;
++ptr;

++address_count;
ucs_assert(address_count <= UCP_MAX_RESOURCES);

ptr += tl_addr_len;
}

} while (!last_dev);
Expand Down Expand Up @@ -587,24 +563,16 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p,
ptr += sizeof(ucp_address_packed_iface_attr_t);

/* tl address length */
iface_addr_len = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LEN_MASK;
last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST;
ep_addr_present = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_EP_ADDR;
tl_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST;
last_tl = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST;
++ptr;

address->dev_addr = (dev_addr_len > 0) ? dev_addr : NULL;
address->md_index = md_index;
address->md_flags = md_flags;
address->iface_addr = (iface_addr_len > 0) ? ptr : NULL;
ptr += iface_addr_len;

if (ep_addr_present) {
ep_addr_len = *(uint8_t*)ptr;
address->ep_addr = (ep_addr_len > 0) ? ptr + 1 : NULL;
ptr += 1 + ep_addr_len;
} else {
address->ep_addr = NULL;
}
address->dev_addr = (dev_addr_len > 0) ? dev_addr : NULL;
address->dev_addr_len = dev_addr_len;
address->md_index = md_index;
address->md_flags = md_flags;
address->tl_addr = (tl_addr_len > 0) ? ptr : NULL;
address->tl_addr_len = tl_addr_len;

ucs_trace("unpack addr[%d] : md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e ovh %e "
"lat_ovh %e dev_priority %d",
Expand All @@ -614,6 +582,8 @@ ucs_status_t ucp_address_unpack(const void *buffer, uint64_t *remote_uuid_p,
address->iface_attr.lat_ovh,
address->iface_attr.priority);
++address;

ptr += tl_addr_len;
}
} while (!last_dev);

Expand Down
Loading

0 comments on commit b9a886a

Please sign in to comment.