Skip to content

Commit

Permalink
Merge pull request #976 from yosefe/topic/atomics-mode-host-dev
Browse files Browse the repository at this point in the history
Select device/host based atomics
  • Loading branch information
yosefe authored Sep 15, 2016
2 parents 945cab1 + ad5a6f5 commit 260db49
Show file tree
Hide file tree
Showing 26 changed files with 499 additions and 189 deletions.
12 changes: 9 additions & 3 deletions src/tools/info/tl_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@
#define PRINT_ATOMIC_CAP(_name, _cap_flags) \
if ((_cap_flags) & (UCT_IFACE_FLAG_##_name##32 | UCT_IFACE_FLAG_##_name##64)) { \
char *s = strduplower(#_name); \
char *domain = ""; \
if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_CPU) { \
domain = ", cpu"; \
} else if ((_cap_flags) & UCT_IFACE_FLAG_ATOMIC_DEVICE) { \
domain = ", device"; \
} \
if (ucs_test_all_flags(_cap_flags, \
UCT_IFACE_FLAG_##_name##32 | UCT_IFACE_FLAG_##_name##64)) \
{ \
printf("# %12s: 32, 64 bit\n", s); \
printf("# %12s: 32, 64 bit%s\n", s, domain); \
} else { \
printf("# %12s: %d bit\n", s, \
((_cap_flags) & UCT_IFACE_FLAG_##_name##32) ? 32 : 64); \
printf("# %12s: %d bit%s\n", s, \
((_cap_flags) & UCT_IFACE_FLAG_##_name##32) ? 32 : 64, domain); \
} \
free(s); \
}
Expand Down
22 changes: 21 additions & 1 deletion src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

ucp_am_handler_t ucp_am_handlers[UCP_AM_ID_LAST] = {{0, NULL, NULL}};

static const char *ucp_atomic_modes[] = {
[UCP_ATOMIC_MODE_CPU] = "cpu",
[UCP_ATOMIC_MODE_DEVICE] = "device",
[UCP_ATOMIC_MODE_LAST] = NULL,
};

static ucs_config_field_t ucp_config_table[] = {
{"NET_DEVICES", "all",
Expand Down Expand Up @@ -84,12 +89,19 @@ static ucs_config_field_t ucp_config_table[] = {
"Estimation of buffer copy bandwidth",
ucs_offsetof(ucp_config_t, ctx.bcopy_bw), UCS_CONFIG_TYPE_MEMUNITS},

{"ATOMIC_MODE", "device",
"Atomic operations synchronization mode.\n"
" cpu - atomic operations are consistent with respect to the CPU.\n"
" device - atomic operations are performed on one of the transport devices,\n"
" and there is guarantee of consistency with respect to the CPU.",
ucs_offsetof(ucp_config_t, ctx.atomic_mode), UCS_CONFIG_TYPE_ENUM(ucp_atomic_modes)},

{"LOG_DATA", "0",
"Size of packet data that is dumped to the log system in debug mode (0 - nothing).",
ucs_offsetof(ucp_config_t, ctx.log_data_size), UCS_CONFIG_TYPE_MEMUNITS},

{"MAX_WORKER_NAME", UCS_PP_MAKE_STRING(UCP_WORKER_NAME_MAX),
"Maximal length of worker name. Affects the size of worker address.",
"Maximal length of worker name. Affects the size of worker address in debug builds.",
ucs_offsetof(ucp_config_t, ctx.max_worker_name), UCS_CONFIG_TYPE_UINT},

{NULL}
Expand Down Expand Up @@ -714,6 +726,14 @@ void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max,
}
}

uint64_t ucp_context_uct_atomic_iface_flags(ucp_context_h context)
{
return ((context->config.features & UCP_FEATURE_AMO32) ?
UCP_UCT_IFACE_ATOMIC32_FLAGS : 0) |
((context->config.features & UCP_FEATURE_AMO64) ?
UCP_UCT_IFACE_ATOMIC64_FLAGS : 0);
}

ucs_status_t ucp_context_query(ucp_context_h context, ucp_context_attr_t *attr)
{
attr->request_size = sizeof(ucp_request_t);
Expand Down
14 changes: 14 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ enum {
};


/**
* Atomic operations mode.
*/
typedef enum {
UCP_ATOMIC_MODE_CPU, /* Use CPU-based atomics */
UCP_ATOMIC_MODE_DEVICE, /* Use device-based atomics */
UCP_ATOMIC_MODE_LAST
} ucp_atomic_mode_t;


typedef struct ucp_context_config {
/** Threshold for switching UCP to buffered copy(bcopy) protocol */
size_t bcopy_thresh;
Expand All @@ -83,6 +93,8 @@ typedef struct ucp_context_config {
size_t log_data_size;
/** Maximal size of worker name for debugging */
unsigned max_worker_name;
/** Atomic mode */
ucp_atomic_mode_t atomic_mode;
} ucp_context_config_t;


Expand Down Expand Up @@ -200,4 +212,6 @@ extern ucp_am_handler_t ucp_am_handlers[];
void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max,
const void *data, size_t length);

uint64_t ucp_context_uct_atomic_iface_flags(ucp_context_h context);

#endif
112 changes: 112 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,115 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker,
return status;
}

static void ucp_worker_enable_atomic_tl(ucp_worker_h worker, const char *mode,
ucp_rsc_index_t rsc_index)
{
ucs_assert(rsc_index != UCP_NULL_RESOURCE);
ucs_trace("worker %p: using %s atomics on iface[%d]=" UCT_TL_RESOURCE_DESC_FMT,
worker, mode, rsc_index,
UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[rsc_index].tl_rsc));
worker->atomic_tls |= UCS_BIT(rsc_index);
}

static void ucp_worker_init_cpu_atomics(ucp_worker_h worker)
{
ucp_context_h context = worker->context;
ucp_rsc_index_t rsc_index;

/* Enable all interfaces which have host-based atomics */
for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) {
if (worker->iface_attrs[rsc_index].cap.flags & UCT_IFACE_FLAG_ATOMIC_CPU) {
ucp_worker_enable_atomic_tl(worker, "cpu", rsc_index);
}
}
}

static void ucp_worker_init_device_atomics(ucp_worker_h worker)
{
ucp_context_h context = worker->context;
ucp_address_iface_attr_t dummy_iface_attr;
ucp_tl_resource_desc_t *rsc, *best_rsc;
uct_iface_attr_t *iface_attr;
ucp_rsc_index_t rsc_index;
uint64_t iface_cap_flags;
double score, best_score;
ucp_rsc_index_t md_index;
uct_md_attr_t *md_attr;
uint64_t supp_tls;
uint8_t priority, best_priority;

iface_cap_flags = ucp_context_uct_atomic_iface_flags(context) |
UCT_IFACE_FLAG_ATOMIC_DEVICE;

dummy_iface_attr.bandwidth = 1e12;
dummy_iface_attr.cap_flags = -1;
dummy_iface_attr.overhead = 0;
dummy_iface_attr.priority = 0;

supp_tls = 0;
best_score = -1;
best_rsc = NULL;
best_priority = 0;

/* Select best interface for atomics device */
for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) {
rsc = &context->tl_rscs[rsc_index];
md_index = rsc->md_index;
md_attr = &context->md_attrs[md_index];
iface_attr = &worker->iface_attrs[rsc_index];

if (!(md_attr->cap.flags & UCT_MD_FLAG_REG) ||
!ucs_test_all_flags(iface_attr->cap.flags, iface_cap_flags))
{
continue;
}

supp_tls |= UCS_BIT(rsc_index);
priority = iface_attr->priority;

score = ucp_wireup_amo_score_func(md_attr, iface_attr, &dummy_iface_attr);
if ((score > best_score) ||
((score == best_score) && (priority > best_priority)))
{
best_rsc = rsc;
best_score = score;
best_priority = priority;
}
}

if (best_rsc == NULL) {
ucs_debug("worker %p: no support for atomics", worker);
return;
}

/* Enable atomics on all resources using same device as the "best" resource */
for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) {
rsc = &context->tl_rscs[rsc_index];
if ((supp_tls & UCS_BIT(rsc_index)) &&
(rsc->md_index == best_rsc->md_index) &&
!strncmp(rsc->tl_rsc.dev_name, best_rsc->tl_rsc.dev_name,
UCT_DEVICE_NAME_MAX))
{
ucp_worker_enable_atomic_tl(worker, "device", rsc_index);
}
}
}

static void ucp_worker_init_atomic_tls(ucp_worker_h worker)
{
ucp_context_h context = worker->context;

worker->atomic_tls = 0;

if (context->config.features & (UCP_FEATURE_AMO32|UCP_FEATURE_AMO64)) {
if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_CPU) {
ucp_worker_init_cpu_atomics(worker);
} else if (context->config.ext.atomic_mode == UCP_ATOMIC_MODE_DEVICE) {
ucp_worker_init_device_atomics(worker);
}
}
}

/* All the ucp endpoints will share the configurations. No need for every ep to
* have it's own configuration (to save memory footprint). Same config can be used
* by different eps.
Expand Down Expand Up @@ -378,6 +487,9 @@ ucs_status_t ucp_worker_create(ucp_context_h context, ucs_thread_mode_t thread_m
}
}

/* Select atomic resources */
ucp_worker_init_atomic_tls(worker);

*worker_p = worker;
return UCS_OK;

Expand Down
16 changes: 16 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@

KHASH_MAP_INIT_INT64(ucp_worker_ep_hash, ucp_ep_t *);


enum {
UCP_UCT_IFACE_ATOMIC32_FLAGS =
UCT_IFACE_FLAG_ATOMIC_ADD32 |
UCT_IFACE_FLAG_ATOMIC_FADD32 |
UCT_IFACE_FLAG_ATOMIC_SWAP32 |
UCT_IFACE_FLAG_ATOMIC_CSWAP32,
UCP_UCT_IFACE_ATOMIC64_FLAGS =
UCT_IFACE_FLAG_ATOMIC_ADD64 |
UCT_IFACE_FLAG_ATOMIC_FADD64 |
UCT_IFACE_FLAG_ATOMIC_SWAP64 |
UCT_IFACE_FLAG_ATOMIC_CSWAP64
};


/**
* UCP worker wake-up context.
*/
Expand All @@ -36,6 +51,7 @@ typedef struct ucp_worker {
uct_worker_h uct; /* UCT worker handle */
ucs_mpool_t req_mp; /* Memory pool for requests */
ucp_worker_wakeup_t wakeup; /* Wakeup-related context */
uint64_t atomic_tls; /* Which resources can be used for atomics */

int inprogress;
char name[UCP_WORKER_NAME_MAX]; /* Worker name */
Expand Down
16 changes: 12 additions & 4 deletions src/ucp/wireup/address.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ ucp_address_gather_devices(ucp_worker_h worker, uint64_t tl_bitmap, int has_ep,

iface_attr = &worker->iface_attrs[i];
if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
dev->tl_addrs_size += iface_attr->iface_addr_len;
dev->tl_addrs_size += iface_attr->iface_addr_len;
} else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
if (has_ep) {
dev->tl_addrs_size += iface_attr->ep_addr_len;
Expand Down Expand Up @@ -219,11 +219,18 @@ ucp_address_pack_ep_address(ucp_ep_h ep, ucp_rsc_index_t tl_index,
}

static void ucp_address_pack_iface_attr(ucp_address_packed_iface_attr_t *packed,
const uct_iface_attr_t *iface_attr)
const uct_iface_attr_t *iface_attr,
int enable_atomics)
{
uint32_t packed_flag;
uint64_t cap_flags;
uint64_t bit;

cap_flags = iface_attr->cap.flags;
if (!enable_atomics) {
cap_flags &= ~(UCP_UCT_IFACE_ATOMIC32_FLAGS | UCP_UCT_IFACE_ATOMIC64_FLAGS);
}

packed->prio_cap_flags = ((uint8_t)iface_attr->priority);
packed->overhead = iface_attr->overhead;
packed->bandwidth = iface_attr->bandwidth;
Expand All @@ -233,7 +240,7 @@ static void ucp_address_pack_iface_attr(ucp_address_packed_iface_attr_t *packed,
bit = 1;
while (UCP_ADDRESS_IFACE_FLAGS & ~(bit - 1)) {
if (UCP_ADDRESS_IFACE_FLAGS & bit) {
if (iface_attr->cap.flags & bit) {
if (cap_flags & bit) {
packed->prio_cap_flags |= packed_flag;
}
packed_flag <<= 1;
Expand Down Expand Up @@ -336,7 +343,8 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep,
ptr += sizeof(uint16_t);

/* Transport information */
ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i]);
ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i],
worker->atomic_tls & UCS_BIT(i));
ptr += sizeof(ucp_address_packed_iface_attr_t);

/* Transport address length */
Expand Down
7 changes: 2 additions & 5 deletions src/ucp/wireup/address.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ enum {
UCT_IFACE_FLAG_PUT_BCOPY |
UCT_IFACE_FLAG_GET_BCOPY |
UCT_IFACE_FLAG_GET_ZCOPY |
UCT_IFACE_FLAG_ATOMIC_ADD32 |
UCT_IFACE_FLAG_ATOMIC_FADD32 |
UCT_IFACE_FLAG_ATOMIC_SWAP32 |
UCT_IFACE_FLAG_ATOMIC_CSWAP32 |
UCT_IFACE_FLAG_ATOMIC_ADD64 |
UCP_UCT_IFACE_ATOMIC32_FLAGS |
UCP_UCT_IFACE_ATOMIC64_FLAGS |
UCT_IFACE_FLAG_ATOMIC_FADD64 |
UCT_IFACE_FLAG_ATOMIC_SWAP64 |
UCT_IFACE_FLAG_ATOMIC_CSWAP64 |
Expand Down
Loading

0 comments on commit 260db49

Please sign in to comment.