Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCX/RNDV: multirail - updated EP configuration #1981

Merged
merged 4 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ static ucs_config_field_t ucp_config_table[] = {
"the eager_zcopy protocol",
ucs_offsetof(ucp_config_t, ctx.rndv_perf_diff), UCS_CONFIG_TYPE_DOUBLE},

{"MAX_RNDV_LANES", "1",
"Set max multirail-get rendezvous lane numbers",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we get rid of multirail word here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi-rail is still here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brminich @yosefe
"Set max rendezvous-get lane numbers"
will work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Maximal number of devices on which a rendezvous operation may be executed in parallel."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

ucs_offsetof(ucp_config_t, ctx.max_rndv_lanes), UCS_CONFIG_TYPE_UINT},

{"ZCOPY_THRESH", "auto",
"Threshold for switching from buffer copy to zero copy protocol",
ucs_offsetof(ucp_config_t, ctx.zcopy_thresh), UCS_CONFIG_TYPE_MEMUNITS},
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ typedef struct ucp_context_config {
int use_mt_mutex;
/** On-demand progress */
int adaptive_progress;
/** Rendezvous multirail support */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multilanes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you agree with multilanes instead of multi-rail?

unsigned max_rndv_lanes;
} ucp_context_config_t;


Expand Down
50 changes: 27 additions & 23 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key)
{
memset(key, 0, sizeof(*key));
key->num_lanes = 0;
key->num_rndv_lanes = 0;
key->am_lane = UCP_NULL_LANE;
key->rndv_lane = UCP_NULL_LANE;
key->wireup_lane = UCP_NULL_LANE;
key->tag_lane = UCP_NULL_LANE;
key->reachable_md_map = 0;
key->err_mode = UCP_ERR_HANDLING_MODE_NONE;
key->status = UCS_OK;
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
memset(key->rndv_lanes, UCP_NULL_LANE, sizeof(key->rndv_lanes));
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
}

ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
Expand Down Expand Up @@ -171,7 +172,7 @@ ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid,
key.lanes[0].rsc_index = UCP_NULL_RESOURCE;
key.lanes[0].dst_md_index = UCP_NULL_RESOURCE;
key.am_lane = 0;
key.rndv_lane = 0;
key.rndv_lanes[0] = 0;
key.wireup_lane = 0;

ep->cfg_index = ucp_worker_get_ep_config(worker, &key);
Expand Down Expand Up @@ -511,16 +512,17 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane;


if ((key1->num_lanes != key2->num_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->rndv_lane != key2->rndv_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
if ((key1->num_lanes != key2->num_lanes) ||
(key1->num_rndv_lanes != key2->num_rndv_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
memcmp(key1->rndv_lanes, key2->rndv_lanes, sizeof(key1->rndv_lanes)) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
{
return 0;
}
Expand Down Expand Up @@ -809,7 +811,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)
if (!ucp_ep_is_tag_offload_enabled(config)) {
/* Tag offload is disabled, AM will be used for all
* tag-matching protocols */
ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane,
/* TODO: set threshold level based on all available lanes */
ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lanes[0],
UCT_IFACE_FLAG_GET_ZCOPY,
max_rndv_thresh);
config->tag.eager = config->am;
Expand All @@ -823,7 +826,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) {
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
}

Expand Down Expand Up @@ -927,8 +930,8 @@ static void ucp_ep_config_print_rma_proto(FILE *stream, const char *name,
fprintf(stream, "..(inf)\n");
}

int ucp_ep_config_get_rma_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane)
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane)
{
int prio;
for (prio = 0; prio < UCP_MAX_LANES; ++prio) {
Expand Down Expand Up @@ -986,13 +989,13 @@ void ucp_ep_config_lane_info_str(ucp_context_h context,
snprintf(p, endp - p, "md[%d]", key->lanes[lane].dst_md_index);
p += strlen(p);

prio = ucp_ep_config_get_rma_prio(key->rma_lanes, lane);
prio = ucp_ep_config_get_multi_lane_prio(key->rma_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " rma#%d", prio);
p += strlen(p);
}

prio = ucp_ep_config_get_rma_prio(key->amo_lanes, lane);
prio = ucp_ep_config_get_multi_lane_prio(key->amo_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " amo#%d", prio);
p += strlen(p);
Expand All @@ -1003,8 +1006,9 @@ void ucp_ep_config_lane_info_str(ucp_context_h context,
p += strlen(p);
}

if (lane == key->rndv_lane) {
snprintf(p, endp - p, " zcopy_rndv");
prio = ucp_ep_config_get_multi_lane_prio(key->rndv_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " zcopy_rndv#%d", prio);
p += strlen(p);
}

Expand Down Expand Up @@ -1058,7 +1062,7 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker,

if (context->config.features & UCP_FEATURE_RMA) {
for (lane = 0; lane < config->key.num_lanes; ++lane) {
if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) {
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
}
ucp_ep_config_print_rma_proto(stream, "put", lane,
Expand Down
10 changes: 7 additions & 3 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ typedef struct ucp_ep_config_key {

ucp_lane_index_t num_lanes; /* Number of active lanes */

ucp_lane_index_t num_rndv_lanes; /* Number of rendezvous lanes */

struct {
ucp_rsc_index_t rsc_index; /* Resource index */
ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy
Expand All @@ -73,10 +75,12 @@ typedef struct ucp_ep_config_key {
} lanes[UCP_MAX_LANES];

ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */
ucp_lane_index_t rndv_lane; /* Lane for zcopy Rendezvous (can be NULL) */
ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */
ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */

/* Lane for zcopy rendezvous (can be NULL) */
ucp_lane_index_t rndv_lanes[UCP_MAX_LANES];

/* Lanes for remote memory access, sorted by priority, highest first */
ucp_lane_index_t rma_lanes[UCP_MAX_LANES];

Expand Down Expand Up @@ -269,8 +273,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config);
int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);

int ucp_ep_config_get_rma_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane);
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane);

size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt,
const uct_linear_growth_t *reg_cost,
Expand Down
23 changes: 14 additions & 9 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ static inline ucp_lane_index_t ucp_ep_get_wireup_msg_lane(ucp_ep_h ep)
return (lane == UCP_NULL_LANE) ? ucp_ep_get_am_lane(ep) : lane;
}

static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep)
static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep, ucp_lane_index_t idx)
{
ucs_assert(ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE);
return ucp_ep_config(ep)->key.rndv_lane;
ucs_assert(ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE);
return ucp_ep_config(ep)->key.rndv_lanes[idx];
}

static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep)
Expand All @@ -44,9 +44,14 @@ static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep)
return ucp_ep_config(ep)->key.tag_lane;
}

static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep)
static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep, ucp_lane_index_t idx)
{
return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_LANE;
return ucp_ep_config(ep)->key.rndv_lanes[idx] != UCP_NULL_LANE;
}

static inline int ucp_ep_rndv_num_lanes(ucp_ep_h ep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better use UCS_F_ALWAYS_INLINE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all other funcs there are static inline
will add separate PR to set UCS_F_ALWAYS_INLINE

{
return ucp_ep_config(ep)->key.num_rndv_lanes;
}

static inline int ucp_ep_is_tag_offload_enabled(ucp_ep_config_t *config)
Expand All @@ -65,9 +70,9 @@ static inline uct_ep_h ucp_ep_get_am_uct_ep(ucp_ep_h ep)
return ep->uct_eps[ucp_ep_get_am_lane(ep)];
}

static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep)
static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep, ucp_lane_index_t idx)
{
return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep)];
return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep, idx)];
}

static inline uct_ep_h ucp_ep_get_tag_uct_ep(ucp_ep_h ep)
Expand Down Expand Up @@ -120,9 +125,9 @@ static inline const uct_md_attr_t* ucp_ep_md_attr(ucp_ep_h ep, ucp_lane_index_t
return &context->tl_mds[ucp_ep_md_index(ep, lane)].attr;
}

static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep)
static inline uint64_t ucp_ep_rndv_md_flags(ucp_ep_h ep, ucp_lane_index_t idx)
{
return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep))->cap.flags;
return ucp_ep_md_attr(ep, ucp_ep_get_rndv_get_lane(ep, idx))->cap.flags;
}

static inline const char* ucp_ep_peer_name(ucp_ep_h ep)
Expand Down
8 changes: 5 additions & 3 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
status = uct_md_mem_reg(uct_md, buffer, length, UCT_MD_MEM_ACCESS_RMA,
&state->dt.contig.memh);
&state->dt.contig[0].memh);
break;
case UCP_DATATYPE_IOV:
iovcnt = state->dt.iov.iovcnt;
Expand Down Expand Up @@ -245,8 +245,9 @@ UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg,

switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
if (state->dt.contig.memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(uct_md, state->dt.contig.memh);
if (state->dt.contig[0].memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(uct_md, state->dt.contig[0].memh);
state->dt.contig[0].memh = UCT_MEM_HANDLE_NULL;
}
break;
case UCP_DATATYPE_IOV:
Expand Down Expand Up @@ -344,3 +345,4 @@ ucp_request_send_start(ucp_request_t *req, ssize_t max_short,

return UCS_ERR_NO_PROGRESS;
}

13 changes: 7 additions & 6 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ struct ucp_request {
} proxy;

struct {
uint64_t remote_address; /* address of the sender's data buffer */
uintptr_t remote_request; /* pointer to the sender's send request */
uct_rkey_bundle_t rkey_bundle;
ucp_request_t *rreq; /* receive request on the recv side */
uint64_t remote_address; /* address of the sender's data buffer */
uintptr_t remote_request; /* pointer to the sender's send request */
uct_rkey_bundle_t rkey_bundle;
ucp_request_t *rreq; /* receive request on the recv side */
} rndv_get;

struct {
Expand Down Expand Up @@ -216,6 +216,7 @@ typedef struct ucp_recv_desc {


extern ucs_mpool_ops_t ucp_request_mpool_ops;
extern ucs_mpool_ops_t ucp_rndv_get_mpool_ops;


int ucp_request_pending_add(ucp_request_t *req, ucs_status_t *req_status);
Expand All @@ -225,8 +226,8 @@ ucs_status_t ucp_request_send_buffer_reg(ucp_request_t *req, ucp_lane_index_t la
void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane);

ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
void *buffer, size_t length,
ucp_datatype_t datatype, ucp_dt_state_t *state);
void *buffer, size_t length, ucp_datatype_t datatype,
ucp_dt_state_t *state);

void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state);
Expand Down
9 changes: 8 additions & 1 deletion src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ ucp_request_send_state_reset(ucp_request_t *req,
/* Fall through */
case UCP_REQUEST_SEND_PROTO_RNDV_GET:
if (UCP_DT_IS_CONTIG(req->send.datatype)) {
req->send.state.dt.dt.contig.memh = UCT_MEM_HANDLE_NULL;
ucp_dt_clear_rndv_lanes(&req->send.state.dt);
}
/* Fall through */
case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
Expand Down Expand Up @@ -331,3 +331,10 @@ static UCS_F_ALWAYS_INLINE void ucp_request_send_tag_stat(ucp_request_t *req)
UCP_EP_STAT_TAG_OP(req->send.ep, EAGER);
}
}

static UCS_F_ALWAYS_INLINE
uct_rkey_bundle_t *ucp_tag_rndv_rkey(ucp_request_t *req)
{
return &req->send.rndv_get.rkey_bundle;
}

1 change: 1 addition & 0 deletions src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t;

/* Lanes */
#define UCP_MAX_LANES 8
#define UCP_MAX_RNDV_LANES 4
#define UCP_NULL_LANE ((ucp_lane_index_t)-1)
typedef uint8_t ucp_lane_index_t;
UCP_UINT_TYPE(UCP_MAX_LANES) ucp_lane_map_t;
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
ucp_ep_config_key_t key = ucp_ep_config(ucp_ep)->key;
key.am_lane = 0;
key.wireup_lane = 0;
key.rndv_lane = 0;
key.rndv_lanes[0] = 0;
key.tag_lane = 0;
key.amo_lanes[0] = 0;
key.rma_lanes[0] = 0;
Expand Down
29 changes: 26 additions & 3 deletions src/ucp/dt/dt.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "dt_iov.h"
#include "dt_generic.h"

#include <ucp/core/ucp_types.h>
#include <uct/api/uct.h>
#include <ucp/api/ucp.h>

Expand All @@ -23,8 +24,8 @@ typedef struct ucp_dt_state {
size_t offset; /* Total offset in overall payload. */
union {
struct {
uct_mem_h memh;
} contig;
uct_mem_h memh;
} contig[UCP_MAX_RNDV_LANES];
struct {
size_t iov_offset; /* Offset in the IOV item */
size_t iovcnt_offset; /* The IOV item to start copy */
Expand All @@ -41,4 +42,26 @@ typedef struct ucp_dt_state {
size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src,
ucp_dt_state_t *state, size_t length);

#endif
static UCS_F_ALWAYS_INLINE void
ucp_dt_clear_rndv_lanes(ucp_dt_state_t *state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is worth not to mention RNDV, as the code is rather generic and is in dt.h

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

{
int i;
for (i = 0; i < UCP_MAX_RNDV_LANES; i++) {
state->dt.contig[i].memh = UCT_MEM_HANDLE_NULL;
}
}

static UCS_F_ALWAYS_INLINE int
ucp_dt_is_empty_rndv_lane(ucp_dt_state_t *state, int idx)
{
return state->dt.contig[idx].memh == UCT_MEM_HANDLE_NULL;
}

static UCS_F_ALWAYS_INLINE int
ucp_dt_have_rndv_lanes(ucp_dt_state_t *state)
{
return !ucp_dt_is_empty_rndv_lane(state, 0);
}

#endif /* UCP_DT_H_ */

2 changes: 1 addition & 1 deletion src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void ucp_dt_iov_copy_uct(uct_iov_t *iov, size_t *iovcnt, size_t max_dst_iov,
case UCP_DATATYPE_CONTIG:
iov[0].buffer = (void *)src_iov + state->offset;
iov[0].length = length_max;
iov[0].memh = state->dt.contig.memh;
iov[0].memh = state->dt.contig[0].memh;
iov[0].stride = 0;
iov[0].count = 1;

Expand Down
Loading