Skip to content

Commit

Permalink
UCP:rndv multirail support infrastructure + protocol
Browse files Browse the repository at this point in the history
- added infrastructure to support multi-rail on rndv-RMA:
  - updated rndv-rma lane to array
  - added memory handles to store registered buffer and remote keys
  • Loading branch information
Sergey Oblomov committed Oct 6, 2017
1 parent 0aa8eb0 commit 3dd33cc
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 56 deletions.
35 changes: 18 additions & 17 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key)
memset(key, 0, sizeof(*key));
key->num_lanes = 0;
key->am_lane = UCP_NULL_LANE;
key->rndv_lane = UCP_NULL_LANE;
key->wireup_lane = UCP_NULL_LANE;
key->tag_lane = UCP_NULL_LANE;
key->reachable_md_map = 0;
key->err_mode = UCP_ERR_HANDLING_MODE_NONE;
key->status = UCS_OK;
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
memset(key->rndv_lanes, UCP_NULL_LANE, sizeof(key->rndv_lanes));
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
}

ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
Expand Down Expand Up @@ -150,7 +150,7 @@ ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid,
key.lanes[0].rsc_index = UCP_NULL_RESOURCE;
key.lanes[0].dst_md_index = UCP_NULL_RESOURCE;
key.am_lane = 0;
key.rndv_lane = 0;
key.rndv_lanes[0] = 0;
key.wireup_lane = 0;

ep->cfg_index = ucp_worker_get_ep_config(worker, &key);
Expand Down Expand Up @@ -694,16 +694,16 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane;


if ((key1->num_lanes != key2->num_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->rndv_lane != key2->rndv_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
if ((key1->num_lanes != key2->num_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
memcmp(key1->rndv_lanes, key2->rndv_lanes, sizeof(key1->rndv_lanes)) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
{
return 0;
}
Expand Down Expand Up @@ -992,7 +992,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)
if (!ucp_ep_is_tag_offload_enabled(config)) {
/* Tag offload is disabled, AM will be used for all
* tag-matching protocols */
ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane,
ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lanes[0],
UCT_IFACE_FLAG_GET_ZCOPY,
max_rndv_thresh);
config->tag.eager = config->am;
Expand Down Expand Up @@ -1186,8 +1186,9 @@ void ucp_ep_config_lane_info_str(ucp_context_h context,
p += strlen(p);
}

if (lane == key->rndv_lane) {
snprintf(p, endp - p, " zcopy_rndv");
prio = ucp_ep_config_get_rma_prio(key->rndv_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " zcopy_rndv#%d", prio);
p += strlen(p);
}

Expand Down
4 changes: 3 additions & 1 deletion src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ typedef struct ucp_ep_config_key {
} lanes[UCP_MAX_LANES];

ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */
ucp_lane_index_t rndv_lane; /* Lane for zcopy Rendezvous (can be NULL) */
ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */
ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */

/* Lane for zcopy Rendezvous (can be NULL), sorted by priority, highest first */
ucp_lane_index_t rndv_lanes[UCP_MAX_LANES];

/* Lanes for remote memory access, sorted by priority, highest first */
ucp_lane_index_t rma_lanes[UCP_MAX_LANES];

Expand Down
24 changes: 15 additions & 9 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ static inline ucp_lane_index_t ucp_ep_get_wireup_msg_lane(ucp_ep_h ep)
return (lane == UCP_NULL_LANE) ? ucp_ep_get_am_lane(ep) : lane;
}

static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep)
static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep, int idx)
{
ucs_assert(ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE);
return ucp_ep_config(ep)->key.rndv_lane;
ucs_assert(ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE);
return ucp_ep_config(ep)->key.rndv_lanes[idx];
}

static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep)
Expand All @@ -44,9 +44,15 @@ static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep)
return ucp_ep_config(ep)->key.tag_lane;
}

static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep)
static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep, int idx)
{
return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE;
return ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE;
}

static inline int ucp_ep_is_rndv_mrail_present(ucp_ep_h ep)
{
return (ucp_ep_config(ep)->key.rndv_lanes[0] != UCP_NULL_LANE &&
ucp_ep_config(ep)->key.rndv_lanes[1] != UCP_NULL_LANE);
}

static inline int ucp_ep_is_tag_offload_enabled(ucp_ep_config_t *config)
Expand All @@ -65,9 +71,9 @@ static inline uct_ep_h ucp_ep_get_am_uct_ep(ucp_ep_h ep)
return ep->uct_eps[ucp_ep_get_am_lane(ep)];
}

static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep)
static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep, int idx)
{
return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep)];
return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep, idx)];
}

static inline uct_ep_h ucp_ep_get_tag_uct_ep(ucp_ep_h ep)
Expand Down Expand Up @@ -120,9 +126,9 @@ static inline const uct_md_attr_t* ucp_ep_md_attr(ucp_ep_h ep, ucp_lane_index_t
return &context->tl_mds[ucp_ep_md_index(ep, lane)].attr;
}

static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep)
static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep, int idx)
{
return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep))->cap.flags;
return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep, idx))->cap.flags;
}

static inline const char* ucp_ep_peer_name(ucp_ep_h ep)
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ void ucp_iov_buffer_memh_dereg(uct_md_h uct_md, uct_mem_h *memh,
UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
(context, rsc_index, buffer, length, datatype, state),
ucp_context_t *context, ucp_rsc_index_t rsc_index, void *buffer,
size_t length, ucp_datatype_t datatype, ucp_dt_state_t *state)
size_t length, ucp_datatype_t datatype,
ucp_dt_state_t *state)
{
ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index;
uct_md_h uct_md = context->tl_mds[mdi].md;
Expand Down
9 changes: 8 additions & 1 deletion src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ struct ucp_request {
uintptr_t remote_request; /* pointer to the sender's send request */
uct_rkey_bundle_t rkey_bundle;
ucp_request_t *rreq; /* receive request on the recv side */
unsigned use_mrail;
unsigned rail_idx;
struct {
ucp_lane_index_t lane;
uct_rkey_bundle_t rkey_bundle;
} mrail[UCP_MAX_RAILS];
} rndv_get;

struct {
Expand Down Expand Up @@ -196,7 +202,8 @@ void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane);

ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
void *buffer, size_t length,
ucp_datatype_t datatype, ucp_dt_state_t *state);
ucp_datatype_t datatype,
ucp_dt_state_t *state);

void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state);
Expand Down
66 changes: 66 additions & 0 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,69 @@ static UCS_F_ALWAYS_INLINE void ucp_request_send_stat(ucp_request_t *req)
UCP_EP_STAT_TAG_OP(req->send.ep, EAGER);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_request_clear_rails(ucp_dt_state_t *state) {
int i;
for(i = 0; i < UCP_MAX_RAILS; i++) {
state->dt.mrail[i].memh = UCT_MEM_HANDLE_NULL;
state->dt.mrail[i].lane = UCP_NULL_LANE;
}
}

static UCS_F_ALWAYS_INLINE int
ucp_request_is_empty_rail(ucp_dt_state_t *state, int rail) {
return state->dt.mrail[rail].memh == UCT_MEM_HANDLE_NULL ||
state->dt.mrail[rail].lane == UCP_NULL_LANE;
}

static UCS_F_ALWAYS_INLINE int
ucp_request_have_rails(ucp_dt_state_t *state) {
return !ucp_request_is_empty_rail(state, 0);
}

static inline int ucp_request_mrail_reg(ucp_request_t *req)
{
ucp_ep_t *ep = req->send.ep;
ucp_dt_state_t *state = &req->send.state;
int cnt = 0;
ucs_status_t status;
int i;
ucp_lane_index_t lane;

ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype));

ucp_request_clear_rails(state);

for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);

if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) {
status = uct_md_mem_reg(ucp_ep_md(ep, lane),
(void *)req->send.buffer, req->send.length,
UCT_MD_MEM_ACCESS_RMA, &state->dt.mrail[cnt].memh);
ucs_assert_always(status == UCS_OK);
state->dt.mrail[cnt].lane = lane;
cnt++;
}
}

return cnt;
}

static inline void ucp_request_mrail_dereg(ucp_request_t *req)
{
ucp_dt_state_t *state = &req->send.state;
ucs_status_t status;
int i;

for (i = 0; i < UCP_MAX_RAILS && !ucp_request_is_empty_rail(&req->send.state, i); i++) {
status = uct_md_mem_dereg(ucp_ep_md(req->send.ep, state->dt.mrail[i].lane),
state->dt.mrail[i].memh);
ucs_assert_always(status == UCS_OK);
}

ucp_request_clear_rails(state);
}


3 changes: 2 additions & 1 deletion src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ typedef ucp_rsc_index_t ucp_md_index_t;
UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t;

/* Lanes */
#define UCP_MAX_LANES 8
#define UCP_MAX_LANES 16
#define UCP_MAX_RAILS 8
#define UCP_NULL_LANE ((ucp_lane_index_t)-1)
typedef uint8_t ucp_lane_index_t;
UCP_UINT_TYPE(UCP_MAX_LANES) ucp_lane_map_t;
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
ucp_ep_config_key_t key = ucp_ep_config(ucp_ep)->key;
key.am_lane = 0;
key.wireup_lane = 0;
key.rndv_lane = 0;
key.rndv_lanes[0] = 0;
key.tag_lane = 0;
key.amo_lanes[0] = 0;
key.rma_lanes[0] = 0;
Expand Down
11 changes: 9 additions & 2 deletions src/ucp/dt/dt.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "dt_contig.h"
#include "dt_iov.h"
#include "dt_generic.h"
#include "ucp/core/ucp_types.h"

#include <uct/api/uct.h>
#include <ucs/debug/profile.h>
Expand All @@ -24,8 +25,14 @@ typedef struct ucp_dt_state {
size_t offset; /* Total offset in overall payload. */
union {
struct {
uct_mem_h memh;
} contig;
struct {
uct_mem_h memh;
} contig;
struct {
ucp_lane_index_t lane;
uct_mem_h memh;
} mrail[UCP_MAX_RAILS];
};
struct {
size_t iov_offset; /* Offset in the IOV item */
size_t iovcnt_offset; /* The IOV item to start copy */
Expand Down
1 change: 1 addition & 0 deletions src/ucp/rma/basic_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer,
if (length < zcopy_thresh) {
req->send.uct_comp.func = ucp_rma_request_bcopy_completion;
req->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL;
ucp_request_clear_rails(&req->send.state);
return UCS_OK;
} else {
req->send.uct_comp.func = ucp_rma_request_zcopy_completion;
Expand Down
Loading

0 comments on commit 3dd33cc

Please sign in to comment.