Skip to content

Commit

Permalink
UCP:rndv multirail support infrastructure + protocol (5)
Browse files Browse the repository at this point in the history
- renamed rail->lanes
  • Loading branch information
Sergey Oblomov committed Nov 6, 2017
1 parent d992f41 commit ab4b3ff
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 28 deletions.
10 changes: 5 additions & 5 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
return UCS_ERR_NO_PROGRESS;
}

int ucp_request_mrail_reg(ucp_request_t *req)
int ucp_request_rndv_reg(ucp_request_t *req)
{
ucp_ep_t *ep = req->send.ep;
ucp_dt_state_t *state = &req->send.state.dt;
Expand All @@ -364,7 +364,7 @@ int ucp_request_mrail_reg(ucp_request_t *req)

ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype));

ucp_dt_clear_rails(state);
ucp_dt_clear_rndv_lanes(state);

for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
lane = ucp_ep_get_rndv_get_lane(ep, i);
Expand All @@ -381,13 +381,13 @@ int ucp_request_mrail_reg(ucp_request_t *req)
return cnt;
}

void ucp_request_mrail_dereg(ucp_request_t *req)
void ucp_request_rndv_dereg(ucp_request_t *req)
{
ucp_dt_state_t *state = &req->send.state.dt;
ucs_status_t status;
int i;

for (i = 0; i < UCP_MAX_RAILS; i++) {
for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
if (state->dt.contig[i].memh != UCT_MEM_HANDLE_NULL) {
status = uct_md_mem_dereg(ucp_ep_md(req->send.ep,
ucp_ep_get_rndv_get_lane(req->send.ep, i)),
Expand All @@ -397,7 +397,7 @@ void ucp_request_mrail_dereg(ucp_request_t *req)
}

req->send.reg_rsc = UCP_NULL_RESOURCE;
ucp_dt_clear_rails(state);
ucp_dt_clear_rndv_lanes(state);
}


6 changes: 3 additions & 3 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ enum {
typedef struct ucp_rndv_get_rkey {
ucp_lane_index_t lane_num; /* number of rkeys obtained from peer */
ucp_lane_index_t lane_idx;
uct_rkey_bundle_t rkey_bundle[UCP_MAX_RAILS];
uct_rkey_bundle_t rkey_bundle[UCP_MAX_RNDV_LANES];
} ucp_rndv_get_rkey_t;


Expand Down Expand Up @@ -239,7 +239,7 @@ ucs_status_t ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
size_t zcopy_thresh, size_t multi_thresh,
size_t rndv_thresh, const ucp_proto_t *proto);

int ucp_request_mrail_reg(ucp_request_t *req);
void ucp_request_mrail_dereg(ucp_request_t *req);
int ucp_request_rndv_reg(ucp_request_t *req);
void ucp_request_rndv_dereg(ucp_request_t *req);

#endif
6 changes: 3 additions & 3 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ ucp_request_send_state_reset(ucp_request_t *req,
/* Fall through */
case UCP_REQUEST_SEND_PROTO_RNDV_GET:
if (UCP_DT_IS_CONTIG(req->send.datatype)) {
ucp_dt_clear_rails(&req->send.state.dt);
ucp_dt_clear_rndv_lanes(&req->send.state.dt);
}
/* Fall through */
case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
Expand Down Expand Up @@ -340,7 +340,7 @@ ucp_request_rndv_get_create(ucp_request_t *req)
req->send.rndv_get.rkey->lane_idx = 0;
req->send.rndv_get.rkey->lane_num = 0;

for (i = 0; i < UCP_MAX_RAILS; i++) {
for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
ucp_tag_rndv_rkey(req, i)->rkey = UCT_INVALID_RKEY;
}
}
Expand All @@ -354,7 +354,7 @@ ucp_request_rndv_get_release(ucp_request_t *req)

ucs_assert(req->send.rndv_get.rkey != NULL);

for (i = 0; i < UCP_MAX_RAILS; i++) {
for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
if (ucp_tag_rndv_rkey(req, i)->rkey != UCT_INVALID_RKEY) {
uct_rkey_release(ucp_tag_rndv_rkey(req, i));
}
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t;

/* Lanes */
#define UCP_MAX_LANES 8
#define UCP_MAX_RAILS 4
#define UCP_MAX_RNDV_LANES 4
#define UCP_NULL_LANE ((ucp_lane_index_t)-1)
typedef uint8_t ucp_lane_index_t;
UCP_UINT_TYPE(UCP_MAX_LANES) ucp_lane_map_t;
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
/* Create epoll set which combines events from all transports */
status = ucp_worker_wakeup_init(worker, params);
if (status != UCS_OK) {
goto err_mrail_mp_cleanup;
goto err_rndv_lanes_mp_cleanup;
}

if (params->field_mask & UCP_WORKER_PARAM_FIELD_CPU_MASK) {
Expand Down Expand Up @@ -1142,7 +1142,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
err_close_ifaces:
ucp_worker_close_ifaces(worker);
ucp_worker_wakeup_cleanup(worker);
err_mrail_mp_cleanup:
err_rndv_lanes_mp_cleanup:
ucs_mpool_cleanup(&worker->rndv_get_mp, 1);
err_req_mp_cleanup:
ucs_mpool_cleanup(&worker->req_mp, 1);
Expand Down
14 changes: 7 additions & 7 deletions src/ucp/dt/dt.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ typedef struct ucp_dt_state {
union {
struct {
uct_mem_h memh;
} contig[UCP_MAX_RAILS];
} contig[UCP_MAX_RNDV_LANES];
};
struct {
size_t iov_offset; /* Offset in the IOV item */
Expand Down Expand Up @@ -123,24 +123,24 @@ ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size,
}

static UCS_F_ALWAYS_INLINE void
ucp_dt_clear_rails(ucp_dt_state_t *state)
ucp_dt_clear_rndv_lanes(ucp_dt_state_t *state)
{
int i;
for (i = 0; i < UCP_MAX_RAILS; i++) {
for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
state->dt.contig[i].memh = UCT_MEM_HANDLE_NULL;
}
}

static UCS_F_ALWAYS_INLINE int
ucp_dt_is_empty_rail(ucp_dt_state_t *state, int rail)
ucp_dt_is_empty_rndv_lane(ucp_dt_state_t *state, int idx)
{
return state->dt.contig[rail].memh == UCT_MEM_HANDLE_NULL;
return state->dt.contig[idx].memh == UCT_MEM_HANDLE_NULL;
}

static UCS_F_ALWAYS_INLINE int
ucp_dt_have_rails(ucp_dt_state_t *state)
ucp_dt_have_rndv_lanes(ucp_dt_state_t *state)
{
return !ucp_dt_is_empty_rail(state, 0);
return !ucp_dt_is_empty_rndv_lane(state, 0);
}

#endif
12 changes: 6 additions & 6 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ static void ucp_rndv_rma_request_send_buffer_dereg(ucp_request_t *sreq)
* (state->dt.contig.memh != UCT_MEM_HANDLE_NULL)
*/
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
ucp_request_mrail_dereg(sreq);
ucp_request_rndv_dereg(sreq);
}
}

Expand Down Expand Up @@ -87,12 +87,12 @@ static size_t ucp_tag_rndv_pack_rkeys(ucp_request_t *sreq, void *rkey_buf, uint1

ucs_assert(UCP_DT_IS_CONTIG(sreq->send.datatype));

cnt = ucp_request_mrail_reg(sreq);
ucs_assert_always(cnt <= UCP_MAX_RAILS);
cnt = ucp_request_rndv_reg(sreq);
ucs_assert_always(cnt <= UCP_MAX_RNDV_LANES);

if (cnt) {
for (i = 0; i < ucp_ep_rndv_num_lanes(ep); i++) {
ucs_assert_always(!ucp_dt_is_empty_rail(state, i));
ucs_assert_always(!ucp_dt_is_empty_rndv_lane(state, i));

lane = ucp_ep_get_rndv_get_lane(ep, i);

Expand Down Expand Up @@ -336,9 +336,9 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self),
}

/* rndv_req is the internal request to perform the get operation */
if (!ucp_dt_have_rails(&rndv_req->send.state.dt)) {
if (!ucp_dt_have_rndv_lanes(&rndv_req->send.state.dt)) {
/* TODO Not all UCTs need registration on the recv side */
ucp_request_mrail_reg(rndv_req);
ucp_request_rndv_reg(rndv_req);
rndv_req->send.reg_rsc = UCP_NULL_RESOURCE;
rndv_req->send.rndv_get.rkey->lane_idx = 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ static ucs_status_t ucp_wireup_add_rndv_lanes(ucp_ep_h ep,
(params->err_mode == UCP_ERR_HANDLING_MODE_PEER)) {
max_lanes = ucs_min(1, ep->worker->context->config.ext.max_rndv_lanes);
} else {
max_lanes = ucs_min(UCP_MAX_RAILS, ep->worker->context->config.ext.max_rndv_lanes);
max_lanes = ucs_min(UCP_MAX_RNDV_LANES, ep->worker->context->config.ext.max_rndv_lanes);
}

while (rndv_lanes < max_lanes) {
Expand Down

0 comments on commit ab4b3ff

Please sign in to comment.