Skip to content

Commit

Permalink
UCX/RNDV: multirail - updated EP configuration
Browse files Browse the repository at this point in the history
- added array of lanes to be used by rndv-mrail
- added array of memory handles for multirail
  • Loading branch information
Sergey Oblomov committed Nov 9, 2017
1 parent dc133c0 commit cb9336b
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 96 deletions.
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ static ucs_config_field_t ucp_config_table[] = {
"the eager_zcopy protocol",
ucs_offsetof(ucp_config_t, ctx.rndv_perf_diff), UCS_CONFIG_TYPE_DOUBLE},

{"MAX_RNDV_LANES", "1",
"Set max multirail-get rendezvous lane numbers",
ucs_offsetof(ucp_config_t, ctx.max_rndv_lanes), UCS_CONFIG_TYPE_UINT},

{"ZCOPY_THRESH", "auto",
"Threshold for switching from buffer copy to zero copy protocol",
ucs_offsetof(ucp_config_t, ctx.zcopy_thresh), UCS_CONFIG_TYPE_MEMUNITS},
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ typedef struct ucp_context_config {
int use_mt_mutex;
/** On-demand progress */
int adaptive_progress;
/** Rendezvous multirail support */
unsigned max_rndv_lanes;
} ucp_context_config_t;


Expand Down
50 changes: 27 additions & 23 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key)
{
memset(key, 0, sizeof(*key));
key->num_lanes = 0;
key->num_rndv_lanes = 0;
key->am_lane = UCP_NULL_LANE;
key->rndv_lane = UCP_NULL_LANE;
key->wireup_lane = UCP_NULL_LANE;
key->tag_lane = UCP_NULL_LANE;
key->reachable_md_map = 0;
key->err_mode = UCP_ERR_HANDLING_MODE_NONE;
key->status = UCS_OK;
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
memset(key->rndv_lanes, UCP_NULL_LANE, sizeof(key->rndv_lanes));
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
}

ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
Expand Down Expand Up @@ -150,7 +151,7 @@ ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid,
key.lanes[0].rsc_index = UCP_NULL_RESOURCE;
key.lanes[0].dst_md_index = UCP_NULL_RESOURCE;
key.am_lane = 0;
key.rndv_lane = 0;
key.rndv_lanes[0] = 0;
key.wireup_lane = 0;

ep->cfg_index = ucp_worker_get_ep_config(worker, &key);
Expand Down Expand Up @@ -481,16 +482,17 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane;


if ((key1->num_lanes != key2->num_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->rndv_lane != key2->rndv_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
if ((key1->num_lanes != key2->num_lanes) ||
(key1->num_rndv_lanes != key2->num_rndv_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
memcmp(key1->rndv_lanes, key2->rndv_lanes, sizeof(key1->rndv_lanes)) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
{
return 0;
}
Expand Down Expand Up @@ -779,7 +781,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)
if (!ucp_ep_is_tag_offload_enabled(config)) {
/* Tag offload is disabled, AM will be used for all
* tag-matching protocols */
ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane,
/* TODO: set threshold level based on all available lanes */
ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lanes[0],
UCT_IFACE_FLAG_GET_ZCOPY,
max_rndv_thresh);
config->tag.eager = config->am;
Expand All @@ -793,7 +796,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) {
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
}

Expand Down Expand Up @@ -897,8 +900,8 @@ static void ucp_ep_config_print_rma_proto(FILE *stream, const char *name,
fprintf(stream, "..(inf)\n");
}

int ucp_ep_config_get_rma_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane)
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane)
{
int prio;
for (prio = 0; prio < UCP_MAX_LANES; ++prio) {
Expand Down Expand Up @@ -956,13 +959,13 @@ void ucp_ep_config_lane_info_str(ucp_context_h context,
snprintf(p, endp - p, "md[%d]", key->lanes[lane].dst_md_index);
p += strlen(p);

prio = ucp_ep_config_get_rma_prio(key->rma_lanes, lane);
prio = ucp_ep_config_get_multi_lane_prio(key->rma_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " rma#%d", prio);
p += strlen(p);
}

prio = ucp_ep_config_get_rma_prio(key->amo_lanes, lane);
prio = ucp_ep_config_get_multi_lane_prio(key->amo_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " amo#%d", prio);
p += strlen(p);
Expand All @@ -973,8 +976,9 @@ void ucp_ep_config_lane_info_str(ucp_context_h context,
p += strlen(p);
}

if (lane == key->rndv_lane) {
snprintf(p, endp - p, " zcopy_rndv");
prio = ucp_ep_config_get_multi_lane_prio(key->rndv_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " zcopy_rndv#%d", prio);
p += strlen(p);
}

Expand Down Expand Up @@ -1028,7 +1032,7 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker,

if (context->config.features & UCP_FEATURE_RMA) {
for (lane = 0; lane < config->key.num_lanes; ++lane) {
if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) {
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
}
ucp_ep_config_print_rma_proto(stream, "put", lane,
Expand Down
10 changes: 7 additions & 3 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ typedef struct ucp_ep_config_key {

ucp_lane_index_t num_lanes; /* Number of active lanes */

ucp_lane_index_t num_rndv_lanes; /* Number of rendezvous lanes */

struct {
ucp_rsc_index_t rsc_index; /* Resource index */
ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy
Expand All @@ -71,10 +73,12 @@ typedef struct ucp_ep_config_key {
} lanes[UCP_MAX_LANES];

ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */
ucp_lane_index_t rndv_lane; /* Lane for zcopy Rendezvous (can be NULL) */
ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */
ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */

/* Lane for zcopy rendezvous (can be NULL) */
ucp_lane_index_t rndv_lanes[UCP_MAX_LANES];

/* Lanes for remote memory access, sorted by priority, highest first */
ucp_lane_index_t rma_lanes[UCP_MAX_LANES];

Expand Down Expand Up @@ -242,8 +246,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config);
int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);

int ucp_ep_config_get_rma_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane);
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane);

size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt,
const uct_linear_growth_t *reg_cost,
Expand Down
23 changes: 14 additions & 9 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ static inline ucp_lane_index_t ucp_ep_get_wireup_msg_lane(ucp_ep_h ep)
return (lane == UCP_NULL_LANE) ? ucp_ep_get_am_lane(ep) : lane;
}

static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep)
static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep, ucp_lane_index_t idx)
{
ucs_assert(ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE);
return ucp_ep_config(ep)->key.rndv_lane;
ucs_assert(ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE);
return ucp_ep_config(ep)->key.rndv_lanes[idx];
}

static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep)
Expand All @@ -44,9 +44,14 @@ static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep)
return ucp_ep_config(ep)->key.tag_lane;
}

static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep)
static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep, ucp_lane_index_t idx)
{
return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE;
return ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE;
}

static inline int ucp_ep_rndv_num_lanes(ucp_ep_h ep)
{
return ucp_ep_config(ep)->key.num_rndv_lanes;
}

static inline int ucp_ep_is_tag_offload_enabled(ucp_ep_config_t *config)
Expand All @@ -65,9 +70,9 @@ static inline uct_ep_h ucp_ep_get_am_uct_ep(ucp_ep_h ep)
return ep->uct_eps[ucp_ep_get_am_lane(ep)];
}

static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep)
static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep, ucp_lane_index_t idx)
{
return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep)];
return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep, idx)];
}

static inline uct_ep_h ucp_ep_get_tag_uct_ep(ucp_ep_h ep)
Expand Down Expand Up @@ -120,9 +125,9 @@ static inline const uct_md_attr_t* ucp_ep_md_attr(ucp_ep_h ep, ucp_lane_index_t
return &context->tl_mds[ucp_ep_md_index(ep, lane)].attr;
}

static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep)
static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep, ucp_lane_index_t idx)
{
return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep))->cap.flags;
return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep, idx))->cap.flags;
}

static inline const char* ucp_ep_peer_name(ucp_ep_h ep)
Expand Down
8 changes: 5 additions & 3 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
status = uct_md_mem_reg(uct_md, buffer, length, UCT_MD_MEM_ACCESS_RMA,
&state->dt.contig.memh);
&state->dt.contig[0].memh);
break;
case UCP_DATATYPE_IOV:
iovcnt = state->dt.iov.iovcnt;
Expand Down Expand Up @@ -245,8 +245,9 @@ UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg,

switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
if (state->dt.contig.memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(uct_md, state->dt.contig.memh);
if (state->dt.contig[0].memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(uct_md, state->dt.contig[0].memh);
state->dt.contig[0].memh = UCT_MEM_HANDLE_NULL;
}
break;
case UCP_DATATYPE_IOV:
Expand Down Expand Up @@ -344,3 +345,4 @@ ucp_request_send_start(ucp_request_t *req, ssize_t max_short,

return UCS_ERR_NO_PROGRESS;
}

13 changes: 7 additions & 6 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ struct ucp_request {
} proxy;

struct {
uint64_t remote_address; /* address of the sender's data buffer */
uintptr_t remote_request; /* pointer to the sender's send request */
uct_rkey_bundle_t rkey_bundle;
ucp_request_t *rreq; /* receive request on the recv side */
uint64_t remote_address; /* address of the sender's data buffer */
uintptr_t remote_request; /* pointer to the sender's send request */
uct_rkey_bundle_t rkey_bundle;
ucp_request_t *rreq; /* receive request on the recv side */
} rndv_get;

struct {
Expand Down Expand Up @@ -206,6 +206,7 @@ typedef struct ucp_recv_desc {


extern ucs_mpool_ops_t ucp_request_mpool_ops;
extern ucs_mpool_ops_t ucp_rndv_get_mpool_ops;


int ucp_request_pending_add(ucp_request_t *req, ucs_status_t *req_status);
Expand All @@ -215,8 +216,8 @@ ucs_status_t ucp_request_send_buffer_reg(ucp_request_t *req, ucp_lane_index_t la
void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane);

ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
void *buffer, size_t length,
ucp_datatype_t datatype, ucp_dt_state_t *state);
void *buffer, size_t length, ucp_datatype_t datatype,
ucp_dt_state_t *state);

void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state);
Expand Down
9 changes: 8 additions & 1 deletion src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ ucp_request_send_state_reset(ucp_request_t *req,
/* Fall through */
case UCP_REQUEST_SEND_PROTO_RNDV_GET:
if (UCP_DT_IS_CONTIG(req->send.datatype)) {
req->send.state.dt.dt.contig.memh = UCT_MEM_HANDLE_NULL;
ucp_dt_clear_rndv_lanes(&req->send.state.dt);
}
/* Fall through */
case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
Expand Down Expand Up @@ -318,3 +318,10 @@ static UCS_F_ALWAYS_INLINE void ucp_request_send_tag_stat(ucp_request_t *req)
UCP_EP_STAT_TAG_OP(req->send.ep, EAGER);
}
}

static UCS_F_ALWAYS_INLINE
uct_rkey_bundle_t *ucp_tag_rndv_rkey(ucp_request_t *req)
{
return &req->send.rndv_get.rkey_bundle;
}

1 change: 1 addition & 0 deletions src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t;

/* Lanes */
#define UCP_MAX_LANES 8
#define UCP_MAX_RNDV_LANES 4
#define UCP_NULL_LANE ((ucp_lane_index_t)-1)
typedef uint8_t ucp_lane_index_t;
UCP_UINT_TYPE(UCP_MAX_LANES) ucp_lane_map_t;
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
ucp_ep_config_key_t key = ucp_ep_config(ucp_ep)->key;
key.am_lane = 0;
key.wireup_lane = 0;
key.rndv_lane = 0;
key.rndv_lanes[0] = 0;
key.tag_lane = 0;
key.amo_lanes[0] = 0;
key.rma_lanes[0] = 0;
Expand Down
26 changes: 24 additions & 2 deletions src/ucp/dt/dt.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "dt_iov.h"
#include "dt_generic.h"

#include <ucp/core/ucp_types.h>
#include <uct/api/uct.h>
#include <ucs/debug/profile.h>
#include <string.h>
Expand All @@ -24,8 +25,8 @@ typedef struct ucp_dt_state {
size_t offset; /* Total offset in overall payload. */
union {
struct {
uct_mem_h memh;
} contig;
uct_mem_h memh;
} contig[UCP_MAX_RNDV_LANES];
struct {
size_t iov_offset; /* Offset in the IOV item */
size_t iovcnt_offset; /* The IOV item to start copy */
Expand Down Expand Up @@ -119,4 +120,25 @@ ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size,
}
}

static UCS_F_ALWAYS_INLINE void
ucp_dt_clear_rndv_lanes(ucp_dt_state_t *state)
{
int i;
for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
state->dt.contig[i].memh = UCT_MEM_HANDLE_NULL;
}
}

static UCS_F_ALWAYS_INLINE int
ucp_dt_is_empty_rndv_lane(ucp_dt_state_t *state, int idx)
{
return state->dt.contig[idx].memh == UCT_MEM_HANDLE_NULL;
}

static UCS_F_ALWAYS_INLINE int
ucp_dt_have_rndv_lanes(ucp_dt_state_t *state)
{
return !ucp_dt_is_empty_rndv_lane(state, 0);
}

#endif
2 changes: 1 addition & 1 deletion src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void ucp_dt_iov_copy_uct(uct_iov_t *iov, size_t *iovcnt, size_t max_dst_iov,
case UCP_DATATYPE_CONTIG:
iov[0].buffer = (void *)src_iov + state->offset;
iov[0].length = length_max;
iov[0].memh = state->dt.contig.memh;
iov[0].memh = state->dt.contig[0].memh;
iov[0].stride = 0;
iov[0].count = 1;

Expand Down
Loading

0 comments on commit cb9336b

Please sign in to comment.