Skip to content

Commit

Permalink
Merge pull request #16 from hoopoepg/topic/pre-pending-rma-get
Browse files Browse the repository at this point in the history
RNDV/GET: try to push request to pending
  • Loading branch information
yosefe authored Jul 14, 2020
2 parents 2572339 + 49f965f commit e514f4f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 75 deletions.
14 changes: 8 additions & 6 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,14 @@ struct ucp_request {
} proxy;

struct {
uint64_t remote_address; /* address of the sender's data buffer */
uintptr_t remote_request; /* pointer to the sender's request */
ucp_request_t *rreq; /* receive request on the recv side */
ucp_rkey_h rkey; /* key for remote send buffer */
ucp_lane_map_t lanes_map; /* used lanes map */
ucp_lane_index_t lane_count; /* number of lanes used in transaction */
uint64_t remote_address; /* address of the sender's data buffer */
uintptr_t remote_request; /* pointer to the sender's request */
ucp_request_t *rreq; /* receive request on the recv side */
ucp_rkey_h rkey; /* key for remote send buffer */
ucp_lane_map_t lanes_map_avail; /* used lanes map */
ucp_lane_map_t lanes_map_all; /* actual lanes map */
uint8_t lanes_count; /* actual lanes map */
uint8_t rkey_index[UCP_MAX_LANES];
} rndv_get;

struct {
Expand Down
157 changes: 88 additions & 69 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,6 @@ static int ucp_rndv_is_recv_pipeline_needed(ucp_request_t *rndv_req,
return 1;
}

static ucp_lane_index_t
ucp_rndv_req_get_zcopy_rma_lane(ucp_request_t *rndv_req, ucp_lane_map_t ignore,
uct_rkey_t *uct_rkey_p)
{
ucp_ep_h ep = rndv_req->send.ep;
ucp_ep_config_t *ep_config = ucp_ep_config(ep);

return ucp_rkey_find_rma_lane(ep->worker->context, ep_config,
rndv_req->send.mem_type,
ep_config->tag.rndv.get_zcopy_lanes,
rndv_req->send.rndv_get.rkey, ignore, uct_rkey_p);
}

static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
{
ucp_worker_h worker;
Expand Down Expand Up @@ -468,66 +455,41 @@ static void ucp_rndv_req_send_rtr(ucp_request_t *rndv_req, ucp_request_t *rreq,
ucp_request_send(rndv_req, 0);
}

static void ucp_rndv_get_lanes_count(ucp_request_t *rndv_req)
{
ucp_ep_h ep = rndv_req->send.ep;
ucp_lane_map_t map = 0;
uct_rkey_t uct_rkey;
ucp_lane_index_t lane;

if (ucs_likely(rndv_req->send.rndv_get.lane_count != 0)) {
return; /* already resolved */
}

while ((lane = ucp_rndv_req_get_zcopy_rma_lane(rndv_req, map, &uct_rkey))
!= UCP_NULL_LANE) {
rndv_req->send.rndv_get.lane_count++;
map |= UCS_BIT(lane);
}

rndv_req->send.rndv_get.lane_count = ucs_min(rndv_req->send.rndv_get.lane_count,
ep->worker->context->config.ext.max_rndv_lanes);
}

static ucp_lane_index_t
ucp_rndv_get_zcopy_get_lane(ucp_request_t *rndv_req, uct_rkey_t *uct_rkey)
{
ucp_lane_index_t lane;

lane = ucp_rndv_req_get_zcopy_rma_lane(rndv_req,
rndv_req->send.rndv_get.lanes_map,
uct_rkey);
ucp_lane_index_t lane_idx;
ucp_ep_config_t *ep_config;
ucp_rkey_h rkey;
uint8_t rkey_index;

if ((lane == UCP_NULL_LANE) && (rndv_req->send.rndv_get.lanes_map != 0)) {
/* lanes_map != 0 - no more lanes (but BW lanes are exist because map
* is not NULL - we found at least one lane on previous iteration).
* reset used lanes map to NULL and iterate it again */
rndv_req->send.rndv_get.lanes_map = 0;
lane = ucp_rndv_req_get_zcopy_rma_lane(rndv_req,
rndv_req->send.rndv_get.lanes_map,
uct_rkey);
if (ucs_unlikely(!rndv_req->send.rndv_get.lanes_map_all)) {
return UCP_NULL_LANE;
}

return lane;
lane_idx = ucs_ffs64_safe(rndv_req->send.rndv_get.lanes_map_avail);
rkey = rndv_req->send.rndv_get.rkey;
rkey_index = rndv_req->send.rndv_get.rkey_index[lane_idx];
*uct_rkey = (rkey_index != UCP_NULL_RESOURCE) ?
rkey->tl_rkey[rkey_index].rkey.rkey : UCT_INVALID_RKEY;
ep_config = ucp_ep_config(rndv_req->send.ep);
return ep_config->tag.rndv.get_zcopy_lanes[lane_idx];
}

static void ucp_rndv_get_zcopy_next_lane(ucp_request_t *rndv_req)
{
/* mask lane for next iteration.
* next time this lane will not be selected & we continue
* with another lane */
ucp_ep_h ep = rndv_req->send.ep;

rndv_req->send.rndv_get.lanes_map |= UCS_BIT(rndv_req->send.lane);

/* in case if masked too much lanes - reset mask to zero
* to select first lane next time */
if (ucs_popcount(rndv_req->send.rndv_get.lanes_map) >=
ep->worker->context->config.ext.max_rndv_lanes) {
rndv_req->send.rndv_get.lanes_map = 0;
rndv_req->send.rndv_get.lanes_map_avail &= rndv_req->send.rndv_get.lanes_map_avail - 1;
if (!rndv_req->send.rndv_get.lanes_map_avail) {
rndv_req->send.rndv_get.lanes_map_avail = rndv_req->send.rndv_get.lanes_map_all;
}
}

static UCS_F_ALWAYS_INLINE size_t
ucp_rndv_get_zcopy_lane_count(ucp_request_t *rndv_req)
{
return rndv_req->send.rndv_get.lanes_count;
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self),
uct_pending_req_t *self)
{
Expand All @@ -549,8 +511,6 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self),
int pending_add_res;
ucp_lane_index_t lane;

ucp_rndv_get_lanes_count(rndv_req);

/* Figure out which lane to use for get operation */
rndv_req->send.lane = lane = ucp_rndv_get_zcopy_get_lane(rndv_req, &uct_rkey);

Expand Down Expand Up @@ -586,7 +546,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self),
length = ucp_mtu - remaining;
} else {
chunk = ucs_align_up((size_t)(ucs_min(rndv_req->send.length /
rndv_req->send.rndv_get.lane_count,
ucp_rndv_get_zcopy_lane_count(rndv_req),
max_zcopy) * config->tag.rndv.scale[lane]),
align);
length = ucs_min(chunk, rndv_req->send.length - offset);
Expand Down Expand Up @@ -696,10 +656,67 @@ static void ucp_rndv_put_completion(uct_completion_t *self, ucs_status_t status)
}
}

static void
ucp_rndv_req_init_zcopy_lane_map(ucp_request_t *rndv_req)
{
ucp_ep_h ep = rndv_req->send.ep;
ucp_ep_config_t *ep_config = ucp_ep_config(ep);
ucp_context_h context = ep->worker->context;
ucs_memory_type_t mem_type = rndv_req->send.mem_type;
ucp_rkey_h rkey = rndv_req->send.rndv_get.rkey;
ucp_lane_map_t lane_map;
ucp_lane_index_t lane;
ucp_md_index_t md_index;
uct_md_attr_t *md_attr;
ucp_md_index_t dst_md_index;
int i;

lane_map = 0;
for (i = 0; i < UCP_MAX_LANES; i++) {
lane = ep_config->tag.rndv.get_zcopy_lanes[i];
if (lane == UCP_NULL_LANE) {
break; /* no more lanes */
}

md_index = ep_config->md_index[lane];
md_attr = &context->tl_mds[md_index].attr;

if (ucs_unlikely((md_index != UCP_NULL_RESOURCE) &&
!(md_attr->cap.flags & UCT_MD_FLAG_NEED_RKEY))) {
/* Lane does not need rkey, can use the lane with invalid rkey */
if (!rkey || ((mem_type == md_attr->cap.access_mem_type) &&
(mem_type == rkey->mem_type))) {
rndv_req->send.rndv_get.rkey_index[i] = UCP_NULL_RESOURCE;
lane_map |= UCS_BIT(i);
continue;
}
}

if (ucs_unlikely((md_index != UCP_NULL_RESOURCE) &&
(!(md_attr->cap.reg_mem_types & UCS_BIT(mem_type))))) {
continue;
}

dst_md_index = ep_config->key.lanes[lane].dst_md_index;
if (ucs_likely(rkey->md_map & UCS_BIT(dst_md_index))) {
/* Return first matching lane */
rndv_req->send.rndv_get.rkey_index[i] = ucs_bitmap2idx(rkey->md_map,
dst_md_index);
lane_map |= UCS_BIT(i);
}
}

rndv_req->send.rndv_get.lanes_map_all = lane_map;
rndv_req->send.rndv_get.lanes_map_avail = lane_map;
rndv_req->send.rndv_get.lanes_count = ucs_popcount(lane_map);
}


static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rreq,
const ucp_rndv_rts_hdr_t *rndv_rts_hdr)
{
ucs_status_t status;
uct_rkey_t uct_rkey;

ucp_trace_req(rndv_req, "start rma_get rreq %p", rreq);

Expand All @@ -711,8 +728,6 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr
rndv_req->send.rndv_get.remote_request = rndv_rts_hdr->sreq.reqptr;
rndv_req->send.rndv_get.remote_address = rndv_rts_hdr->address;
rndv_req->send.rndv_get.rreq = rreq;
rndv_req->send.rndv_get.lanes_map = 0;
rndv_req->send.rndv_get.lane_count = 0;
rndv_req->send.datatype = rreq->recv.datatype;

status = ucp_ep_rkey_unpack(rndv_req->send.ep, rndv_rts_hdr + 1,
Expand All @@ -725,8 +740,14 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr
ucp_request_send_state_init(rndv_req, ucp_dt_make_contig(1), 0);
ucp_request_send_state_reset(rndv_req, ucp_rndv_get_completion,
UCP_REQUEST_SEND_PROTO_RNDV_GET);
ucp_rndv_req_init_zcopy_lane_map(rndv_req);
rndv_req->send.lane = ucp_rndv_get_zcopy_get_lane(rndv_req, &uct_rkey);
rndv_req->send.pending_lane = UCP_NULL_LANE;

ucp_request_send(rndv_req, 0);
if ((rndv_req->send.lane == UCP_NULL_LANE) ||
!ucp_request_pending_add(rndv_req, &status, 0)) {
ucp_request_send(rndv_req, 0);
}
}

static void ucp_rndv_send_frag_rtr(ucp_worker_h worker, ucp_request_t *rndv_req,
Expand Down Expand Up @@ -1502,11 +1523,9 @@ static ucs_status_t ucp_rndv_pipeline(ucp_request_t *sreq,
freq->send.rndv_get.rkey = NULL;
freq->send.rndv_get.remote_address =
(uint64_t)UCS_PTR_BYTE_OFFSET(fsreq->send.buffer, offset);
freq->send.rndv_get.lanes_map = 0;
freq->send.rndv_get.lane_count = 0;
freq->send.rndv_get.rreq = fsreq;
freq->send.mdesc = mdesc;

ucp_rndv_req_init_zcopy_lane_map(freq);
}

ucp_request_send(freq, 0);
Expand Down

0 comments on commit e514f4f

Please sign in to comment.