Skip to content

Commit

Permalink
UCP:rndv multirail support infrastructure + protocol (6)
Browse files Browse the repository at this point in the history
- functions/values renaming, minor optimization
  • Loading branch information
Sergey Oblomov committed Nov 7, 2017
1 parent ab4b3ff commit 1d7597f
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 82 deletions.
21 changes: 9 additions & 12 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,41 +353,38 @@ ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
return UCS_ERR_NO_PROGRESS;
}

int ucp_request_rndv_reg(ucp_request_t *req)
void ucp_request_rndv_mem_reg(ucp_request_t *req)
{
ucp_ep_t *ep = req->send.ep;
ucp_dt_state_t *state = &req->send.state.dt;
int cnt = 0;
ucp_ep_t *ep = req->send.ep;
ucp_dt_state_t *state = &req->send.state.dt;
ucs_status_t status;
int i;
ucp_lane_index_t lane;

ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype));

ucp_dt_clear_rndv_lanes(state);

for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);

if (ucp_ep_rndv_md_flags(ep, i) & UCT_MD_FLAG_NEED_RKEY) {
if (ucp_ep_rndv_md_flags(ep, i) & UCT_MD_FLAG_NEED_MEMH) {
status = uct_md_mem_reg(ucp_ep_md(ep, lane),
(void *)req->send.buffer, req->send.length,
UCT_MD_MEM_ACCESS_RMA, &state->dt.contig[i].memh);
ucs_assert_always(status == UCS_OK);
cnt++;
} else {
state->dt.contig[i].memh = UCT_MEM_HANDLE_NULL;
}
}

return cnt;
req->send.reg_rsc = UCP_NULL_RESOURCE;
}

void ucp_request_rndv_dereg(ucp_request_t *req)
void ucp_request_rndv_mem_dereg(ucp_request_t *req)
{
ucp_dt_state_t *state = &req->send.state.dt;
ucs_status_t status;
int i;

for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
for (i = 0; i < ucp_ep_rndv_num_lanes(req->send.ep); i++) {
if (state->dt.contig[i].memh != UCT_MEM_HANDLE_NULL) {
status = uct_md_mem_dereg(ucp_ep_md(req->send.ep,
ucp_ep_get_rndv_get_lane(req->send.ep, i)),
Expand Down
8 changes: 4 additions & 4 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ enum {
* available - only one rkey is proceeded
*/
typedef struct ucp_rndv_get_rkey {
ucp_lane_index_t lane_num; /* number of rkeys obtained from peer */
ucp_lane_index_t lane_idx;
ucp_lane_index_t num_lanes; /* number of rkeys obtained from peer */
ucp_lane_index_t lane_idx; /* rendezvous line index used for next op */
uct_rkey_bundle_t rkey_bundle[UCP_MAX_RNDV_LANES];
} ucp_rndv_get_rkey_t;

Expand Down Expand Up @@ -239,7 +239,7 @@ ucs_status_t ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
size_t zcopy_thresh, size_t multi_thresh,
size_t rndv_thresh, const ucp_proto_t *proto);

int ucp_request_rndv_reg(ucp_request_t *req);
void ucp_request_rndv_dereg(ucp_request_t *req);
void ucp_request_rndv_mem_reg(ucp_request_t *req);
void ucp_request_rndv_mem_dereg(ucp_request_t *req);

#endif
4 changes: 2 additions & 2 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ ucp_request_rndv_get_create(ucp_request_t *req)
req->send.rndv_get.rkey = ucs_mpool_get_inline(&(req->send.ep->worker)->rndv_get_mp);
ucs_assert_always(req->send.rndv_get.rkey != NULL);

req->send.rndv_get.rkey->lane_idx = 0;
req->send.rndv_get.rkey->lane_num = 0;
req->send.rndv_get.rkey->lane_idx = 0;
req->send.rndv_get.rkey->num_lanes = 0;

for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
ucp_tag_rndv_rkey(req, i)->rkey = UCT_INVALID_RKEY;
Expand Down
57 changes: 22 additions & 35 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@ static int ucp_tag_rndv_is_get_op_possible(ucp_ep_h ep, ucp_request_t *req)
return 0;
}

if (ucp_ep_rndv_num_lanes(ep) == 0) {
return 0;
} else {
for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
md_flags = ucp_ep_rndv_md_flags(ep, i);
rkey = ucp_tag_rndv_rkey(req, i)->rkey;
if ((md_flags & UCT_MD_FLAG_NEED_RKEY) && (rkey == UCT_INVALID_RKEY)) {
return 0;
}
for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
md_flags = ucp_ep_rndv_md_flags(ep, i);
rkey = ucp_tag_rndv_rkey(req, i)->rkey;
if ((md_flags & UCT_MD_FLAG_NEED_RKEY) && (rkey == UCT_INVALID_RKEY)) {
return 0;
}
}

Expand All @@ -48,7 +44,7 @@ static void ucp_rndv_rma_request_send_buffer_dereg(ucp_request_t *sreq)
* (state->dt.contig.memh != UCT_MEM_HANDLE_NULL)
*/
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
ucp_request_rndv_dereg(sreq);
ucp_request_rndv_mem_dereg(sreq);
}
}

Expand Down Expand Up @@ -78,32 +74,26 @@ size_t ucp_tag_rndv_pack_rkey(ucp_request_t *sreq, ucp_lane_index_t lane,

static size_t ucp_tag_rndv_pack_rkeys(ucp_request_t *sreq, void *rkey_buf, uint16_t *flags)
{
ucp_ep_t *ep = sreq->send.ep;
ucp_dt_state_t *state = &sreq->send.state.dt;
size_t packet = 0;
int cnt = 0;
ucp_ep_t *ep = sreq->send.ep;
ucp_dt_state_t *state = &sreq->send.state.dt;
size_t packet = 0;
int i;
ucp_lane_index_t lane;

ucs_assert(UCP_DT_IS_CONTIG(sreq->send.datatype));

cnt = ucp_request_rndv_reg(sreq);
ucs_assert_always(cnt <= UCP_MAX_RNDV_LANES);
ucp_request_rndv_mem_reg(sreq);

if (cnt) {
for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
ucs_assert_always(!ucp_dt_is_empty_rndv_lane(state, i));

lane = ucp_ep_get_rndv_get_lane(ep, i);
for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);

if (ucp_ep_rndv_md_flags(ep, i) & UCT_MD_FLAG_NEED_RKEY) {
UCS_PROFILE_CALL(uct_md_mkey_pack, ucp_ep_md(ep, lane),
state->dt.contig[i].memh, rkey_buf + packet);
packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size;
}
if (ucp_ep_rndv_md_flags(ep, i) & UCT_MD_FLAG_NEED_RKEY) {
UCS_PROFILE_CALL(uct_md_mkey_pack, ucp_ep_md(ep, lane),
state->dt.contig[i].memh, rkey_buf + packet);
packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size;
}
*flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY;
}
*flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY;

ucs_assert_always(packet <= ucp_ep_config(ep)->am.max_bcopy);

Expand All @@ -129,7 +119,7 @@ static void ucp_tag_rndv_unpack_rkeys(ucp_request_t *req, void *rkey_buf, uint16

ucp_request_rndv_get_create(req);

req->send.rndv_get.rkey->lane_num = rkeys;
req->send.rndv_get.rkey->num_lanes = rkeys;

for (i = 0; i < rkeys; i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);
Expand Down Expand Up @@ -338,16 +328,15 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self),
/* rndv_req is the internal request to perform the get operation */
if (!ucp_dt_have_rndv_lanes(&rndv_req->send.state.dt)) {
/* TODO Not all UCTs need registration on the recv side */
ucp_request_rndv_reg(rndv_req);
rndv_req->send.reg_rsc = UCP_NULL_RESOURCE;
ucp_request_rndv_mem_reg(rndv_req);
rndv_req->send.rndv_get.rkey->lane_idx = 0;
}

lane_idx = rndv_req->send.rndv_get.rkey->lane_idx;
rsc_index = ucp_ep_get_rsc_index(ep, ucp_ep_get_rndv_get_lane(ep, lane_idx));
align = ep->worker->ifaces[rsc_index].attr.cap.get.opt_zcopy_align;
ucp_mtu = ep->worker->ifaces[rsc_index].attr.cap.get.align_mtu;
lanes = rndv_req->send.rndv_get.rkey->lane_num;
lanes = rndv_req->send.rndv_get.rkey->num_lanes;

ucs_trace_data("ep: %p try to progress get_zcopy for rndv get. rndv_req: %p. lane: %d",
ep, rndv_req, rndv_req->send.lane);
Expand All @@ -360,7 +349,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self),
} else {
length = ucs_min(rndv_req->send.length - offset,
ucs_min(ucp_ep_config(ep)->tag.rndv.max_get_zcopy,
((rndv_req->send.length) + ucs_max(lanes, align) - 1) /
((rndv_req->send.length) - 1) /
lanes));
}

Expand Down Expand Up @@ -393,12 +382,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self),
if (rndv_req->send.state.uct_comp.count == 0) {
ucp_rndv_complete_rndv_get(rndv_req);
}
} else {
if (status == UCS_OK) {
} else if (status == UCS_OK) {
/* in case if not all chunks are transmitted - return in_progress
* status */
return UCS_INPROGRESS;
}
}

return status;
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/tag/rndv.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#include <ucp/proto/proto.h>

enum {
UCP_RNDV_RTS_FLAG_PACKED_RKEY = UCS_BIT(0),
UCP_RNDV_RTS_FLAG_OFFLOAD = UCS_BIT(1)
UCP_RNDV_RTS_FLAG_PACKED_RKEY = UCS_BIT(0),
UCP_RNDV_RTS_FLAG_OFFLOAD = UCS_BIT(1)
};

/*
Expand Down
44 changes: 23 additions & 21 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,9 @@ static ucs_status_t ucp_wireup_add_rndv_lanes(ucp_ep_h ep,

if ((params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) &&
(params->err_mode == UCP_ERR_HANDLING_MODE_PEER)) {
max_lanes = ucs_min(1, ep->worker->context->config.ext.max_rndv_lanes);
*num_rndv_lanes_p = 0;
ucs_trace_func("rendezvous: disabled due to error mode");
return UCS_OK;
} else {
max_lanes = ucs_min(UCP_MAX_RNDV_LANES, ep->worker->context->config.ext.max_rndv_lanes);
}
Expand All @@ -829,27 +831,27 @@ static ucs_status_t ucp_wireup_add_rndv_lanes(ucp_ep_h ep,
status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria,
tl_bitmap, remote_md_map, 0,
&rsc_index, &addr_index, &score);
if (status == UCS_OK) {
/* Add lane description and remove all occurrences of the remote md */
dst_md_index = address_list[addr_index].md_index;
remote_md_map &= ~UCS_BIT(dst_md_index);
tl_bitmap = ucp_wireup_unset_tl_by_md(ep, tl_bitmap, rsc_index);

if ((strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") != NULL)) {
/* a temporary workaround to prevent the ugni uct from using rndv */
continue;
}
if ( status != UCS_OK) {
break;
}

rndv_lanes++;
/* Add lane description and remove all occurrences of the remote md */
dst_md_index = address_list[addr_index].md_index;
remote_md_map &= ~UCS_BIT(dst_md_index);
tl_bitmap = ucp_wireup_unset_tl_by_md(ep, tl_bitmap, rsc_index);

ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index,
dst_md_index, score,
UCP_WIREUP_LANE_USAGE_RNDV, 0);
if (ep->worker->context->tl_rscs[rsc_index].tl_rsc.dev_type == UCT_DEVICE_TYPE_SHM) {
/* In case if selected SHM transport - leave it alone, disable multirail */
break;
}
} else {
if ((strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") != NULL)) {
/* a temporary workaround to prevent the ugni uct from using rndv */
continue;
}

rndv_lanes++;

ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index,
dst_md_index, score,
UCP_WIREUP_LANE_USAGE_RNDV, 0);
if (ep->worker->context->tl_rscs[rsc_index].tl_rsc.dev_type == UCT_DEVICE_TYPE_SHM) {
/* In case if selected SHM transport - leave it alone, disable multirail */
break;
}
}
Expand Down Expand Up @@ -1044,7 +1046,7 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params,
}
}

/* Sort RNDV, RMA and AMO lanes according to score */
/* Sort RMA and AMO lanes according to score */
ucs_qsort_r(key->rma_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t),
ucp_wireup_compare_lane_rma_score, lane_descs);
ucs_qsort_r(key->amo_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t),
Expand Down
6 changes: 0 additions & 6 deletions src/ucs/sys/compiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@
( (_type*)( (char*)(void*)(_ptr) - ucs_offsetof(_type, _member) ) )


/**
* Size of statically-declared array
* May be used for arrays in structure values
*/
#define ucs_countof(_array) (sizeof(_array) / sizeof((_array)[0]))

/**
* Size of statically-declared array
*/
Expand Down

0 comments on commit 1d7597f

Please sign in to comment.