Skip to content

Commit

Permalink
UCP:rndv multirail support infrastructure + protocol (4)
Browse files Browse the repository at this point in the history
- added wireup configuration for mrail
- added context parameter to enable mrail (disabled by default)
  • Loading branch information
Sergey Oblomov committed Oct 13, 2017
1 parent 7cb752d commit bdda5df
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 43 deletions.
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ static ucs_config_field_t ucp_config_table[] = {
"the eager_zcopy protocol",
ucs_offsetof(ucp_config_t, ctx.rndv_perf_diff), UCS_CONFIG_TYPE_DOUBLE},

{"RNDV_MRAIL", "n",
"Enable multirail-get rendezvous support",
ucs_offsetof(ucp_config_t, ctx.rndv_mrail), UCS_CONFIG_TYPE_BOOL},

{"ZCOPY_THRESH", "auto",
"Threshold for switching from buffer copy to zero copy protocol",
ucs_offsetof(ucp_config_t, ctx.zcopy_thresh), UCS_CONFIG_TYPE_MEMUNITS},
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ typedef struct ucp_context_config {
int use_mt_mutex;
/** On-demand progress */
int adaptive_progress;
/** Rendezvous multirail support */
int rndv_mrail;
} ucp_context_config_t;


Expand Down
18 changes: 9 additions & 9 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
ucs_status_string(status));
UCS_PROFILE_REQUEST_EVENT(req, "complete_send", status);

if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_RNDV_MRAIL)) {
ucs_mpool_put_inline(req->send.rndv_get.mrail);
}

ucp_request_complete(req, send.cb, status);
}

Expand All @@ -101,10 +97,6 @@ ucp_request_complete_recv(ucp_request_t *req, ucs_status_t status)
--req->recv.worker->context->tm.offload.sw_req_count;
}

if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_RNDV_MRAIL)) {
ucs_mpool_put_inline(req->send.rndv_get.mrail);
}

ucp_request_complete(req, recv.cb, status, &req->recv.info);
}

Expand Down Expand Up @@ -208,6 +200,14 @@ ucp_request_mrail_create(ucp_request_t *req)
req->flags |= UCP_REQUEST_FLAG_RNDV_MRAIL;
}

static UCS_F_ALWAYS_INLINE void
ucp_request_mrail_release(ucp_request_t *req)
{
ucs_trace_req("mrail release request %p", req);
ucs_mpool_put_inline(req->send.rndv_get.mrail);
req->flags &= ~UCP_REQUEST_FLAG_RNDV_MRAIL;
}

static UCS_F_ALWAYS_INLINE void
ucp_request_clear_rails(ucp_dt_state_t *state) {
int i;
Expand Down Expand Up @@ -240,7 +240,7 @@ static inline int ucp_request_mrail_reg(ucp_request_t *req)

ucp_request_clear_rails(state);

for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) {
for (i = 0; i < UCP_MAX_RAILS && ucp_ep_is_rndv_lane_present(ep, i); i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);

if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) {
Expand Down
8 changes: 5 additions & 3 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static void ucp_tag_rndv_unpack_mrail_rkeys(ucp_request_t *req, void *rkey_buf)

ucp_request_mrail_create(req);

for (i = 0; ucp_ep_is_rndv_lane_present(ep, i) && i < UCP_MAX_RAILS; i++) {
for (i = 0; i < UCP_MAX_RAILS && ucp_ep_is_rndv_lane_present(ep, i); i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);
if (ucp_ep_rndv_md_flags(ep, lane) & UCT_MD_FLAG_NEED_RKEY) {
UCS_PROFILE_CALL(uct_rkey_unpack, rkey_buf + packet,
Expand All @@ -119,8 +119,6 @@ static void ucp_tag_rndv_unpack_mrail_rkeys(ucp_request_t *req, void *rkey_buf)
packet += ucp_ep_md_attr(ep, lane)->rkey_packed_size;
}
}

req->flags |= UCP_REQUEST_FLAG_RNDV_MRAIL;
}

static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg)
Expand Down Expand Up @@ -416,6 +414,10 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_get_completion, (self, status),
{
ucp_request_t *rndv_req = ucs_container_of(self, ucp_request_t, send.uct_comp);

if (rndv_req->flags & UCP_REQUEST_FLAG_RNDV_MRAIL) {
ucp_request_mrail_release(rndv_req);
}

if (rndv_req->send.state.offset == rndv_req->send.length) {
ucs_trace_req("completed rndv get operation rndv_req: %p", rndv_req);
ucp_rndv_complete_rndv_get(rndv_req);
Expand Down
101 changes: 70 additions & 31 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ enum {
UCP_WIREUP_LANE_USAGE_RMA = UCS_BIT(1),
UCP_WIREUP_LANE_USAGE_AMO = UCS_BIT(2),
UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3),
UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4)
UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4),
UCP_WIREUP_LANE_USAGE_RNDV_MRAIL = UCS_BIT(5)
};


Expand Down Expand Up @@ -396,6 +397,23 @@ static int ucp_wireup_compare_lane_amo_score(const void *elem1, const void *elem
return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, amo);
}

static uint64_t ucp_wireup_unset_tl_by_md(ucp_ep_h ep, uint64_t tl_bitmap,
ucp_rsc_index_t rsc_index)
{
ucp_worker_h worker = ep->worker;
ucp_context_h context = worker->context;
ucp_rsc_index_t md_index = context->tl_rscs[rsc_index].md_index;
ucp_rsc_index_t i;

for (i = 0; i < context->num_tls; i++) {
if (context->tl_rscs[i].md_index == md_index) {
tl_bitmap &= ~UCS_BIT(i);
}
}

return tl_bitmap;
}

static UCS_F_NOINLINE ucs_status_t
ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count,
const ucp_address_entry_t *address_list,
Expand Down Expand Up @@ -440,26 +458,36 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count,
dst_md_index = address_list_copy[addr_index].md_index;
reg_score = score;

if (!(usage & UCP_WIREUP_LANE_USAGE_RNDV_MRAIL)) {
/* Select additional transports which can access allocated memory, but only
* if their scores are better. We need this because a remote memory block can
* be potentially allocated using one of them, and we might get better performance
* than the transports which support only registered remote memory.
*/
snprintf(title, sizeof(title), criteria->title, "allocated");
mem_criteria.title = title;
mem_criteria.remote_md_flags = UCT_MD_FLAG_ALLOC;
} else {
tl_bitmap = ucp_wireup_unset_tl_by_md(ep, tl_bitmap, rsc_index);

if (strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni")) {
/* a temporary workaround to prevent the ugni uct from using rndv */
goto out_free_address_list;
}
}

/* Add to the list of lanes and remove all occurrences of the remote md
* from the address list, to avoid selecting the same remote md again.*/
ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index,
dst_md_index, score, usage, 0);
remote_md_map &= ~UCS_BIT(dst_md_index);

/* Select additional transports which can access allocated memory, but only
* if their scores are better. We need this because a remote memory block can
* be potentially allocated using one of them, and we might get better performance
* than the transports which support only registered remote memory.
*/
snprintf(title, sizeof(title), criteria->title, "allocated");
mem_criteria.title = title;
mem_criteria.remote_md_flags = UCT_MD_FLAG_ALLOC;

while (address_count > 0) {
status = ucp_wireup_select_transport(ep, address_list_copy, address_count,
&mem_criteria, tl_bitmap, remote_md_map,
0, &rsc_index, &addr_index, &score);
if ((status != UCS_OK) || (score <= reg_score)) {
if ((status != UCS_OK) ||
((score <= reg_score) && !(usage & UCP_WIREUP_LANE_USAGE_RNDV_MRAIL))) {
break;
}

Expand All @@ -468,6 +496,10 @@ ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count,
ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index,
dst_md_index, score, usage, 0);
remote_md_map &= ~UCS_BIT(dst_md_index);

if (usage & UCP_WIREUP_LANE_USAGE_RNDV_MRAIL) {
tl_bitmap = ucp_wireup_unset_tl_by_md(ep, tl_bitmap, rsc_index);
}
}

status = UCS_OK;
Expand Down Expand Up @@ -690,12 +722,12 @@ static ucs_status_t ucp_wireup_add_am_lane(ucp_ep_h ep, const ucp_ep_params_t *p
return UCS_OK;
}

static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep,
const ucp_ep_params_t *params,
unsigned address_count,
const ucp_address_entry_t *address_list,
ucp_wireup_lane_desc_t *lane_descs,
ucp_lane_index_t *num_lanes_p)
static ucs_status_t ucp_wireup_add_rndv_lanes(ucp_ep_h ep,
const ucp_ep_params_t *params,
unsigned address_count,
const ucp_address_entry_t *address_list,
ucp_wireup_lane_desc_t *lane_descs,
ucp_lane_index_t *num_lanes_p)
{
ucp_wireup_criteria_t criteria;
ucp_rsc_index_t rsc_index;
Expand All @@ -721,17 +753,24 @@ static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep,
criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS;
}

status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria,
-1, -1, 0, &rsc_index, &addr_index, &score);
if ((status == UCS_OK) &&
/* a temporary workaround to prevent the ugni uct from using rndv */
(strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") == NULL)) {
ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index,
address_list[addr_index].md_index, score,
UCP_WIREUP_LANE_USAGE_RNDV, 0);
if (ep->worker->context->config.ext.rndv_mrail) {
status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria,
-1, -1, 0, &rsc_index, &addr_index, &score);
if ((status == UCS_OK) &&
/* a temporary workaround to prevent the ugni uct from using rndv */
(strstr(ep->worker->context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni") == NULL)) {
ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index,
address_list[addr_index].md_index, score,
UCP_WIREUP_LANE_USAGE_RNDV, 0);
}
return status;
} else {
return ucp_wireup_add_memaccess_lanes(ep, address_count, address_list,
lane_descs, num_lanes_p, &criteria,
-1,
UCP_WIREUP_LANE_USAGE_RNDV |
UCP_WIREUP_LANE_USAGE_RNDV_MRAIL);
}

return UCS_OK;
}

/* Lane for transport offloaded tag interface */
Expand Down Expand Up @@ -886,8 +925,8 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params,
return status;
}

status = ucp_wireup_add_rndv_lane(ep, params, address_count, address_list,
lane_descs, &key->num_lanes);
status = ucp_wireup_add_rndv_lanes(ep, params, address_count, address_list,
lane_descs, &key->num_lanes);
if (status != UCS_OK) {
return status;
}
Expand Down Expand Up @@ -924,8 +963,8 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params,
}
if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RNDV) {
/* TODO: add rndv sort */
ucs_assert(key->rndv_lanes[0] == UCP_NULL_LANE);
key->rndv_lanes[0] = lane;
ucs_assert(key->rndv_lanes[lane] == UCP_NULL_LANE);
key->rndv_lanes[lane] = lane;
}
if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RMA) {
key->rma_lanes[lane] = lane;
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ static ucs_status_t ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type,
ucs_status_t status;
void *address;

ucs_trace("%s: type: %d", __FUNCTION__, type);

ucs_assert(ep->cfg_index != (uint8_t)-1);

/* We cannot allocate from memory pool because it's not thread safe
Expand Down

0 comments on commit bdda5df

Please sign in to comment.