From af95804e5738201ab98f2ec8f1791683aa12cd4b Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Wed, 3 May 2017 11:37:08 +0300 Subject: [PATCH 1/4] UCP: preparations for the tag matching offload --- src/ucp/core/ucp_context.c | 12 ++ src/ucp/core/ucp_context.h | 5 +- src/ucp/core/ucp_ep.c | 285 +++++++++++++++++++----------- src/ucp/core/ucp_ep.h | 84 ++++++--- src/ucp/core/ucp_ep.inl | 13 +- src/ucp/core/ucp_types.h | 1 + src/ucp/core/ucp_worker.c | 62 ++++--- src/ucp/core/ucp_worker.h | 21 ++- src/ucp/proto/proto.h | 4 +- src/ucp/proto/proto_am.inl | 4 +- src/ucp/rma/basic_rma.c | 8 +- src/ucp/tag/rndv.c | 16 +- src/ucp/tag/tag_match.c | 2 + src/ucp/tag/tag_match.h | 4 + src/ucp/tag/tag_recv.c | 3 + src/ucp/tag/tag_send.c | 47 +++-- src/ucp/wireup/address.c | 16 +- src/ucp/wireup/address.h | 1 + src/ucp/wireup/select.c | 70 +++++++- src/ucp/wireup/stub_ep.c | 8 +- src/ucp/wireup/wireup.c | 4 +- src/ucp/wireup/wireup.h | 2 +- src/ucs/sys/sys.c | 5 + src/ucs/sys/sys.h | 1 + test/gtest/ucp/test_ucp_wireup.cc | 2 +- 25 files changed, 466 insertions(+), 214 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index cc889afe376..3e7f30a5eb4 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -7,6 +7,7 @@ #include "ucp_context.h" #include "ucp_request.h" +#include #include #include @@ -129,6 +130,11 @@ static ucs_config_field_t ucp_config_table[] = { "y - Use mutex for multithreading support in UCP.\n", ucs_offsetof(ucp_config_t, ctx.use_mt_mutex), UCS_CONFIG_TYPE_BOOL}, + {"TM_THRESH", "1024", + "Threshold for using tag matching offload capabilities.\n" + "Smaller buffers will not be posted to the transport.", + ucs_offsetof(ucp_config_t, ctx.tm_thresh), UCS_CONFIG_TYPE_MEMUNITS}, + {NULL} }; @@ -709,6 +715,12 @@ static ucs_status_t ucp_fill_config(ucp_context_h context, : UCP_MT_TYPE_SPINLOCK); context->config.ext = config->ctx; + /* Post threshold for tag offload should not be less than ucp_request_hdr_t + * size, because this header may be scattered to user buffer in case of + * expected SW RNDV protocol. */ + context->tm.post_thresh = ucs_max(context->config.ext.tm_thresh, + sizeof(ucp_request_hdr_t)); + /* always init MT lock in context even though it is disabled by user, * because we need to use context lock to protect ucp_mm_ and ucp_rkey_ * routines */ diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 9ab54e4e328..1f77265141f 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -37,6 +37,9 @@ typedef struct ucp_context_config { size_t bcopy_bw; /** Size of packet data that is dumped to the log system in debug mode */ size_t log_data_size; + /** Threshold for using tag matching offload capabilities. Smaller buffers + * will not be posted to the transport. */ + size_t tm_thresh; /** Maximal size of worker name for debugging */ unsigned max_worker_name; /** Atomic mode */ @@ -99,7 +102,7 @@ typedef struct ucp_context { ucp_tl_resource_desc_t *tl_rscs; /* Array of communication resources */ ucp_rsc_index_t num_tls; /* Number of resources in the array*/ - ucp_tag_match_t tm; /* Tag-matching queues */ + ucp_tag_match_t tm; /* Tag-matching queues and offload info */ struct { diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 990412f0cbe..79f376088bb 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -38,6 +38,7 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key) key->am_lane = UCP_NULL_LANE; key->rndv_lane = UCP_NULL_LANE; key->wireup_lane = UCP_NULL_LANE; + key->tag_lane = UCP_NULL_LANE; key->reachable_md_map = 0; memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes)); memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes)); @@ -216,7 +217,7 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker, ucp_ep_config(ep)->err_mode = params->err_mode; if (params->err_mode == UCP_ERR_HANDLING_MODE_PEER) { /* Disable RNDV */ - ucp_ep_config(ep)->rndv.am_thresh = SIZE_MAX; + ucp_ep_config(ep)->tag.rndv.am_thresh = SIZE_MAX; } } else { ucp_ep_config(ep)->err_mode = UCP_ERR_HANDLING_MODE_NONE; @@ -567,6 +568,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, (key1->reachable_md_map != key2->reachable_md_map) || (key1->am_lane != key2->am_lane) || (key1->rndv_lane != key2->rndv_lane) || + (key1->tag_lane != key2->tag_lane) || (key1->wireup_lane != key2->wireup_lane)) { return 0; @@ -632,7 +634,8 @@ static size_t ucp_ep_config_calc_rndv_thresh(ucp_context_h context, } static void ucp_ep_config_set_am_rndv_thresh(ucp_context_h context, uct_iface_attr_t *iface_attr, - uct_md_attr_t *md_attr, ucp_ep_config_t *config) + uct_md_attr_t *md_attr, ucp_ep_config_t *config, + size_t adjust_min_val) { size_t rndv_thresh; @@ -653,7 +656,96 @@ static void ucp_ep_config_set_am_rndv_thresh(ucp_context_h context, uct_iface_at ucs_assert(iface_attr->cap.am.min_zcopy <= iface_attr->cap.am.max_zcopy); /* use rendezvous only starting from minimal zero-copy am size */ rndv_thresh = ucs_max(rndv_thresh, iface_attr->cap.am.min_zcopy); - config->rndv.am_thresh = rndv_thresh; + config->tag.rndv.am_thresh = ucs_min(rndv_thresh, adjust_min_val); +} + +static void ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, + ucp_ep_config_t *config, + ucp_lane_index_t lane, + uint64_t rndv_cap_flag, + size_t adjust_min_val) +{ + ucp_context_t *context = worker->context; + ucp_rsc_index_t rsc_index; + size_t rndv_thresh; + uct_iface_attr_t *iface_attr; + uct_md_attr_t *md_attr; + + if (lane != UCP_NULL_LANE) { + rsc_index = config->key.lanes[lane].rsc_index; + if (rsc_index != UCP_NULL_RESOURCE) { + iface_attr = &worker->ifaces[rsc_index].attr; + md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; + ucs_assert_always(iface_attr->cap.flags & rndv_cap_flag); + + if (context->config.ext.rndv_thresh == UCS_CONFIG_MEMUNITS_AUTO) { + /* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/ + rndv_thresh = ucp_ep_config_calc_rndv_thresh(context, iface_attr, + md_attr, SIZE_MAX, 1); + } else { + /* In order to disable rendezvous, need to set the threshold to + * infinite (-1). + */ + rndv_thresh = context->config.ext.rndv_thresh; + } + + /* use rendezvous only starting from minimal zero-copy get size */ + ucs_assert(iface_attr->cap.get.min_zcopy <= iface_attr->cap.get.max_zcopy); + rndv_thresh = ucs_max(rndv_thresh, iface_attr->cap.get.min_zcopy); + + config->tag.rndv.max_get_zcopy = iface_attr->cap.get.max_zcopy; + config->tag.rndv.rma_thresh = ucs_min(rndv_thresh, adjust_min_val); + } else { + ucs_debug("rendezvous (get_zcopy) protocol is not supported "); + } + } +} + +static void ucp_ep_config_init_attrs(ucp_worker_t *worker, ucp_rsc_index_t rsc_index, + ucp_ep_msg_config_t *config, size_t max_short, + size_t max_bcopy, size_t max_zcopy, + size_t max_iov, uint64_t short_flag, + uint64_t bcopy_flag, uint64_t zcopy_flag, + unsigned hdr_len, size_t adjust_min_val) +{ + ucp_context_t *context = worker->context; + uct_iface_attr_t *iface_attr = &worker->ifaces[rsc_index].attr; + uct_md_attr_t *md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; + size_t it; + size_t zcopy_thresh; + + + if (iface_attr->cap.flags & short_flag) { + config->max_short = max_short - hdr_len; + } else { + config->max_short = -1; + } + + if (iface_attr->cap.flags & bcopy_flag) { + config->max_bcopy = max_bcopy; + } + + if ((iface_attr->cap.flags & zcopy_flag) && (md_attr->cap.flags & UCT_MD_FLAG_REG)) { + config->max_zcopy = max_zcopy; + config->max_iov = ucs_min(UCP_MAX_IOV, max_iov); + + if (context->config.ext.zcopy_thresh == UCS_CONFIG_MEMUNITS_AUTO) { + config->zcopy_auto_thresh = 1; + for (it = 0; it < UCP_MAX_IOV; ++it) { + zcopy_thresh = ucp_ep_config_get_zcopy_auto_thresh(it + 1, + &md_attr->reg_cost, + context, + iface_attr->bandwidth); + zcopy_thresh = ucs_min(zcopy_thresh, adjust_min_val); + config->sync_zcopy_thresh[it] = zcopy_thresh; + config->zcopy_thresh[it] = zcopy_thresh; + } + } else { + config->zcopy_auto_thresh = 0; + config->sync_zcopy_thresh[0] = config->zcopy_thresh[0] = + ucs_min(context->config.ext.zcopy_thresh, adjust_min_val); + } + } } void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) @@ -661,22 +753,30 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) ucp_context_h context = worker->context; ucp_ep_rma_config_t *rma_config; uct_iface_attr_t *iface_attr; - ucp_rsc_index_t rsc_index; uct_md_attr_t *md_attr; + ucp_rsc_index_t rsc_index; ucp_lane_index_t lane; - size_t zcopy_thresh, rndv_thresh, it; + size_t it; + size_t max_rndv_thresh; /* Default settings */ for (it = 0; it < UCP_MAX_IOV; ++it) { - config->am.zcopy_thresh[it] = SIZE_MAX; - config->am.sync_zcopy_thresh[it] = SIZE_MAX; + config->am.zcopy_thresh[it] = SIZE_MAX; + config->am.sync_zcopy_thresh[it] = SIZE_MAX; + config->tag.eager.zcopy_thresh[it] = SIZE_MAX; + config->tag.eager.sync_zcopy_thresh[it] = SIZE_MAX; } - config->am.zcopy_auto_thresh = 0; - config->bcopy_thresh = context->config.ext.bcopy_thresh; - config->rndv.rma_thresh = SIZE_MAX; - config->rndv.max_get_zcopy = SIZE_MAX; - config->rndv.am_thresh = SIZE_MAX; - config->p2p_lanes = 0; + config->tag.eager.zcopy_auto_thresh = 0; + config->am.zcopy_auto_thresh = 0; + config->p2p_lanes = 0; + config->bcopy_thresh = context->config.ext.bcopy_thresh; + config->tag.lane = UCP_NULL_LANE; + config->tag.proto = &ucp_tag_eager_proto; + config->tag.sync_proto = &ucp_tag_eager_sync_proto; + config->tag.rndv.rma_thresh = SIZE_MAX; + config->tag.rndv.max_get_zcopy = SIZE_MAX; + config->tag.rndv.am_thresh = SIZE_MAX; + max_rndv_thresh = SIZE_MAX; /* Collect p2p lanes */ for (lane = 0; lane < config->key.num_lanes; ++lane) { @@ -688,56 +788,69 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) } } - /* Configuration for active messages */ + /* Configuration for tag offload */ + if (config->key.tag_lane != UCP_NULL_LANE) { + lane = config->key.tag_lane; + rsc_index = config->key.lanes[lane].rsc_index; + if (rsc_index != UCP_NULL_RESOURCE) { + iface_attr = &worker->ifaces[rsc_index].attr; + ucp_ep_config_init_attrs(worker, rsc_index, &config->tag.eager, + iface_attr->cap.tag.eager.max_short, + iface_attr->cap.tag.eager.max_bcopy, + iface_attr->cap.tag.eager.max_zcopy, + iface_attr->cap.tag.eager.max_iov, + UCT_IFACE_FLAG_TAG_EAGER_SHORT, + UCT_IFACE_FLAG_TAG_EAGER_BCOPY, + UCT_IFACE_FLAG_TAG_EAGER_ZCOPY, 0, + iface_attr->cap.tag.eager.max_bcopy); + + config->tag.offload.max_recv_iov = iface_attr->cap.tag.recv.max_iov; + config->tag.rndv.max_iov = iface_attr->cap.tag.rndv.max_iov; + config->tag.sync_proto = NULL; + config->tag.proto = NULL; + config->tag.lane = lane; + config->tag.offload.enabled = 1; + max_rndv_thresh = iface_attr->cap.tag.eager.max_zcopy; + + ucp_ep_config_set_rndv_thresh(worker, config, lane, + UCT_IFACE_FLAG_TAG_RNDV_ZCOPY, + max_rndv_thresh); + } + } + if (config->key.am_lane != UCP_NULL_LANE) { lane = config->key.am_lane; rsc_index = config->key.lanes[lane].rsc_index; if (rsc_index != UCP_NULL_RESOURCE) { - iface_attr = &worker->iface_attrs[rsc_index]; - md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; - - if (iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) { - config->am.max_eager_short = iface_attr->cap.am.max_short - - sizeof(ucp_eager_hdr_t); - config->am.max_short = iface_attr->cap.am.max_short - - sizeof(uint64_t); - } else { - config->am.max_eager_short = -1; - config->am.max_short = -1; - } - - if (iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY) { - config->am.max_bcopy = iface_attr->cap.am.max_bcopy; - } - - if ((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_ZCOPY) && - (md_attr->cap.flags & UCT_MD_FLAG_REG)) - { - config->am.max_zcopy = iface_attr->cap.am.max_zcopy; - config->am.max_iovcnt = ucs_min(UCP_MAX_IOV, iface_attr->cap.am.max_iov); - - if (context->config.ext.zcopy_thresh == UCS_CONFIG_MEMUNITS_AUTO) { - /* auto */ - config->am.zcopy_auto_thresh = 1; - for (it = 0; it < UCP_MAX_IOV; ++it) { - zcopy_thresh = ucp_ep_config_get_zcopy_auto_thresh( - it + 1, &md_attr->reg_cost, context, - iface_attr->bandwidth); - config->am.sync_zcopy_thresh[it] = zcopy_thresh; - config->am.zcopy_thresh[it] = ucs_max(zcopy_thresh, - iface_attr->cap.am.min_zcopy); - } - } else { - config->am.sync_zcopy_thresh[0] = context->config.ext.zcopy_thresh; - config->am.zcopy_thresh[0] = ucs_max(context->config.ext.zcopy_thresh, - iface_attr->cap.am.min_zcopy); - } + iface_attr = &worker->ifaces[rsc_index].attr; + md_attr = &context->tl_mds[rsc_index].attr; + ucp_ep_config_init_attrs(worker, rsc_index, &config->am, + iface_attr->cap.am.max_short, + iface_attr->cap.am.max_bcopy, + iface_attr->cap.am.max_zcopy, + iface_attr->cap.am.max_iov, + UCT_IFACE_FLAG_AM_SHORT, + UCT_IFACE_FLAG_AM_BCOPY, + UCT_IFACE_FLAG_AM_ZCOPY, + sizeof(ucp_eager_hdr_t), SIZE_MAX); + + /* Calculate rndv threshold for AM Rendezvous, which may be used by + * any tag-matching protocol (AM and offload). */ + ucp_ep_config_set_am_rndv_thresh(context, iface_attr, md_attr, config, + max_rndv_thresh); + + if (!config->tag.offload.enabled) { + /* Tag offload is disabled, AM will be used for all + * tag-matching protocols */ + ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane, + UCT_IFACE_FLAG_GET_ZCOPY, + max_rndv_thresh); + config->tag.eager = config->am; + config->tag.lane = lane; } - - /* calculate an rndv threshold for AM Rendezvous */ - ucp_ep_config_set_am_rndv_thresh(context, iface_attr, md_attr, config); } else { - config->am.max_bcopy = UCP_MIN_BCOPY; /* Stub endpoint */ + /* Stub endpoint */ + config->am.max_bcopy = UCP_MIN_BCOPY; } } @@ -749,7 +862,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) rma_config = &config->rma[lane]; rsc_index = config->key.lanes[lane].rsc_index; - iface_attr = &worker->iface_attrs[rsc_index]; + iface_attr = &worker->ifaces[rsc_index].attr; rma_config->put_zcopy_thresh = SIZE_MAX; rma_config->get_zcopy_thresh = SIZE_MAX; @@ -790,39 +903,6 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) rma_config->max_put_bcopy = UCP_MIN_BCOPY; /* Stub endpoint */ } } - - /* Configuration for Rendezvous data */ - if (config->key.rndv_lane != UCP_NULL_LANE) { - lane = config->key.rndv_lane; - rsc_index = config->key.lanes[lane].rsc_index; - if (rsc_index != UCP_NULL_RESOURCE) { - iface_attr = &worker->iface_attrs[rsc_index]; - md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; - ucs_assert_always(iface_attr->cap.flags & UCT_IFACE_FLAG_GET_ZCOPY); - - if (context->config.ext.rndv_thresh == UCS_CONFIG_MEMUNITS_AUTO) { - /* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/ - rndv_thresh = ucp_ep_config_calc_rndv_thresh(context, iface_attr, md_attr, - SIZE_MAX, - 1); - } else { - /* In order to disable rendezvous, need to set the threshold to - * infinite (-1). - */ - rndv_thresh = context->config.ext.rndv_thresh; - } - - /* use rendezvous only starting from minimal zero-copy get size */ - ucs_assert(iface_attr->cap.get.min_zcopy <= iface_attr->cap.get.max_zcopy); - rndv_thresh = ucs_max(rndv_thresh, - iface_attr->cap.get.min_zcopy); - - config->rndv.max_get_zcopy = iface_attr->cap.get.max_zcopy; - config->rndv.rma_thresh = rndv_thresh; - } else { - ucs_debug("rendezvous (get_zcopy) protocol is not supported "); - } - } } static void ucp_ep_config_print_tag_proto(FILE *stream, const char *name, @@ -960,8 +1040,9 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker, const uint8_t *addr_indices, ucp_rsc_index_t aux_rsc_index) { - ucp_context_h context = worker->context; - char lane_info[128] = {0}; + ucp_context_h context = worker->context; + char lane_info[128] = {0}; + const ucp_ep_msg_config_t *tag_config; ucp_lane_index_t lane; for (lane = 0; lane < config->key.num_lanes; ++lane) { @@ -972,16 +1053,18 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker, fprintf(stream, "#\n"); if (context->config.features & UCP_FEATURE_TAG) { + tag_config = (config->tag.offload.enabled) ? &config->tag.eager : + &config->am; ucp_ep_config_print_tag_proto(stream, "tag_send", - config->am.max_eager_short, - config->am.zcopy_thresh[0], - config->rndv.rma_thresh, - config->rndv.am_thresh); + tag_config->max_short, + tag_config->zcopy_thresh[0], + config->tag.rndv.rma_thresh, + config->tag.rndv.am_thresh); ucp_ep_config_print_tag_proto(stream, "tag_send_sync", - config->am.max_eager_short, - config->am.sync_zcopy_thresh[0], - config->rndv.rma_thresh, - config->rndv.am_thresh); + tag_config->max_short, + tag_config->sync_zcopy_thresh[0], + config->tag.rndv.rma_thresh, + config->tag.rndv.am_thresh); } if (context->config.features & UCP_FEATURE_RMA) { diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 6560529e70f..433f77c6937 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -63,6 +63,7 @@ typedef struct ucp_ep_config_key { ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ ucp_lane_index_t rndv_lane; /* Lane for zcopy Rendezvous (can be NULL) */ + ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ /* Lanes for remote memory access, sorted by priority, highest first */ @@ -93,48 +94,77 @@ typedef struct ucp_ep_rma_config { } ucp_ep_rma_config_t; +/* + * Configuration for AM and tag offload protocols + */ +typedef struct ucp_ep_msg_config { + ssize_t max_short; + size_t max_bcopy; + size_t max_zcopy; + size_t max_iov; + + /* zero-copy threshold for operations which do not have to wait for remote side */ + size_t zcopy_thresh[UCP_MAX_IOV]; + + /* zero-copy threshold for operations which anyways have to wait for remote side */ + size_t sync_zcopy_thresh[UCP_MAX_IOV]; + uint8_t zcopy_auto_thresh; /* if != 0 the zcopy enabled */ +} ucp_ep_msg_config_t; + + typedef struct ucp_ep_config { /* A key which uniquely defines the configuration, and all other fields of * configuration (in the current worker) and defined only by it. */ - ucp_ep_config_key_t key; + ucp_ep_config_key_t key; /* Bitmap of which lanes are p2p; affects the behavior of connection * establishment protocols. */ - ucp_lane_map_t p2p_lanes; - - /* Limits for active-message based protocols */ - struct { - ssize_t max_eager_short; /* Maximal payload of eager short */ - ssize_t max_short; /* Maximal payload of am short */ - size_t max_bcopy; /* Maximal total size of am_bcopy */ - size_t max_zcopy; /* Maximal total size of am_zcopy */ - size_t max_iovcnt; /* Maximal size of iovcnt */ - /* zero-copy threshold for operations which do not have to wait for remote side */ - size_t zcopy_thresh[UCP_MAX_IOV]; - /* zero-copy threshold for operations which anyways have to wait for remote side */ - size_t sync_zcopy_thresh[UCP_MAX_IOV]; - uint8_t zcopy_auto_thresh; /* if != 0 the zcopy enabled */ - } am; + ucp_lane_map_t p2p_lanes; /* Configuration for each lane that provides RMA */ - ucp_ep_rma_config_t rma[UCP_MAX_LANES]; + ucp_ep_rma_config_t rma[UCP_MAX_LANES]; /* Threshold for switching from put_short to put_bcopy */ - size_t bcopy_thresh; + size_t bcopy_thresh; + + /* Error handling mode */ + ucp_err_handling_mode_t err_mode; + + /* Configuration for AM lane */ + ucp_ep_msg_config_t am; struct { - /* Maximal total size of rndv_get_zcopy */ - size_t max_get_zcopy; - /* Threshold for switching from eager to RMA based rendezvous */ - size_t rma_thresh; - /* Threshold for switching from eager to AM based rendezvous */ - size_t am_thresh; - } rndv; + /* Protocols used for tag matching operations + * (can be AM based or tag offload). */ + const ucp_proto_t *proto; + const ucp_proto_t *sync_proto; + + /* Lane used for tag matching operations. */ + ucp_lane_index_t lane; + + /* Configuration of the lane used for eager protocols + * (can be AM or tag offload). */ + ucp_ep_msg_config_t eager; + + struct { + /* Maximal total size of rndv_get_zcopy */ + size_t max_get_zcopy; + /* Threshold for switching from eager to RMA based rendezvous */ + size_t rma_thresh; + /* Threshold for switching from eager to AM based rendezvous */ + size_t am_thresh; + /* Maximal iov count for RNDV offload */ + size_t max_iov; + } rndv; + + struct { + size_t max_recv_iov; + unsigned enabled; + } offload; + } tag; - /* Error handling mode */ - ucp_err_handling_mode_t err_mode; } ucp_ep_config_t; diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index da9cc806a1b..b9d3b198112 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -38,6 +38,12 @@ static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep) return ucp_ep_config(ep)->key.rndv_lane; } +static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep) +{ + ucs_assert(ucp_ep_config(ep)->key.tag_lane != UCP_NULL_RESOURCE); + return ucp_ep_config(ep)->key.tag_lane; +} + static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep) { return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_RESOURCE; @@ -53,6 +59,11 @@ static inline uct_ep_h ucp_ep_get_rndv_data_uct_ep(ucp_ep_h ep) return ep->uct_eps[ucp_ep_get_rndv_get_lane(ep)]; } +static inline uct_ep_h ucp_ep_get_tag_uct_ep(ucp_ep_h ep) +{ + return ep->uct_eps[ucp_ep_get_tag_lane(ep)]; +} + static inline ucp_rsc_index_t ucp_ep_get_rsc_index(ucp_ep_h ep, ucp_lane_index_t lane) { return ucp_ep_config(ep)->key.lanes[lane].rsc_index; @@ -60,7 +71,7 @@ static inline ucp_rsc_index_t ucp_ep_get_rsc_index(ucp_ep_h ep, ucp_lane_index_t static inline uct_iface_attr_t *ucp_ep_get_iface_attr(ucp_ep_h ep, ucp_lane_index_t lane) { - return &ep->worker->iface_attrs[ucp_ep_get_rsc_index(ep, lane)]; + return &ep->worker->ifaces[ucp_ep_get_rsc_index(ep, lane)].attr; } static inline ucp_rsc_index_t ucp_ep_num_lanes(ucp_ep_h ep) diff --git a/src/ucp/core/ucp_types.h b/src/ucp/core/ucp_types.h index 16af27b0571..a7e6fa4703a 100644 --- a/src/ucp/core/ucp_types.h +++ b/src/ucp/core/ucp_types.h @@ -40,6 +40,7 @@ typedef struct ucp_request ucp_request_t; typedef struct ucp_address_iface_attr ucp_address_iface_attr_t; typedef struct ucp_address_entry ucp_address_entry_t; typedef struct ucp_stub_ep ucp_stub_ep_t; +typedef struct ucp_proto ucp_proto_t; /** diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 9d8e3fcab55..730554ac905 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -44,7 +45,7 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker) ucp_rsc_index_t rsc_index; for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { - if (worker->ifaces[rsc_index] == NULL) { + if (worker->ifaces[rsc_index].iface == NULL) { continue; } @@ -52,7 +53,17 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker) uct_wakeup_close(worker->wakeup.iface_wakeups[rsc_index]); } - uct_iface_close(worker->ifaces[rsc_index]); + if (ucp_worker_is_tl_tag_offload(worker, rsc_index)) { + ucs_queue_remove(&worker->context->tm.offload_ifaces, + &worker->ifaces[rsc_index].queue); + if (ucs_queue_length(&worker->context->tm.offload_ifaces) == 1) { + /* Enable offload, because just one tag offload capable interface left */ + worker->context->tm.post_thresh = ucs_max(worker->context->config.ext.tm_thresh, + sizeof(ucp_request_hdr_t)); + } + } + + uct_iface_close(worker->ifaces[rsc_index].iface); } } @@ -108,9 +119,9 @@ static void ucp_worker_remove_am_handlers(ucp_worker_h worker) for (tl_id = 0; tl_id < context->num_tls; ++tl_id) { for (am_id = 0; am_id < UCP_AM_ID_LAST; ++am_id) { if (context->config.features & ucp_am_handlers[am_id].features) { - (void)uct_iface_set_am_handler(worker->ifaces[tl_id], am_id, - ucp_stub_am_handler, worker, - UCT_AM_CB_FLAG_ASYNC); + (void)uct_iface_set_am_handler(worker->ifaces[tl_id].iface, + am_id, ucp_stub_am_handler, + worker, UCT_AM_CB_FLAG_ASYNC); } } } @@ -256,12 +267,12 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker, goto out; } - status = uct_iface_query(iface, &worker->iface_attrs[tl_id]); + status = uct_iface_query(iface, &worker->ifaces[tl_id].attr); if (status != UCS_OK) { goto out; } - attr = &worker->iface_attrs[tl_id]; + attr = &worker->ifaces[tl_id].attr; /* Set active message handlers for tag matching */ if ((attr->cap.flags & (UCT_IFACE_FLAG_AM_SHORT|UCT_IFACE_FLAG_AM_BCOPY|UCT_IFACE_FLAG_AM_ZCOPY))) { @@ -299,11 +310,24 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker, } } + if (ucp_worker_is_tl_tag_offload(worker, tl_id)) { + if (ucs_queue_is_empty(&context->tm.offload_ifaces)) { + context->tm.post_thresh = ucs_max(context->config.ext.tm_thresh, + sizeof(ucp_request_hdr_t)); + } else { + /* Some offload interface/s already configured. Disable TM receive offload, + * because multiple offload ifaces are not supported yet. */ + context->tm.post_thresh = SIZE_MAX; + } + worker->ifaces[tl_id].rsc_index = tl_id; + ucs_queue_push(&context->tm.offload_ifaces, &worker->ifaces[tl_id].queue); + } + ucs_debug("created interface[%d] using "UCT_TL_RESOURCE_DESC_FMT" on worker %p", tl_id, UCT_TL_RESOURCE_DESC_ARG(&resource->tl_rsc), worker); worker->wakeup.iface_wakeups[tl_id] = wakeup; - worker->ifaces[tl_id] = iface; + worker->ifaces[tl_id].iface = iface; return UCS_OK; out_close_wakeup: @@ -332,7 +356,7 @@ static void ucp_worker_init_cpu_atomics(ucp_worker_h worker) /* Enable all interfaces which have host-based atomics */ for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { - if (worker->iface_attrs[rsc_index].cap.flags & UCT_IFACE_FLAG_ATOMIC_CPU) { + if (worker->ifaces[rsc_index].attr.cap.flags & UCT_IFACE_FLAG_ATOMIC_CPU) { ucp_worker_enable_atomic_tl(worker, "cpu", rsc_index); } } @@ -370,7 +394,7 @@ static void ucp_worker_init_device_atomics(ucp_worker_h worker) rsc = &context->tl_rscs[rsc_index]; md_index = rsc->md_index; md_attr = &context->tl_mds[md_index].attr; - iface_attr = &worker->iface_attrs[rsc_index]; + iface_attr = &worker->ifaces[rsc_index].attr; if (!(md_attr->cap.flags & UCT_MD_FLAG_REG) || !ucs_test_all_flags(iface_attr->cap.flags, iface_cap_flags)) @@ -417,7 +441,7 @@ static void ucp_worker_init_guess_atomics(ucp_worker_h worker) uint64_t accumulated_flags = 0; for (rsc_index = 0; rsc_index < context->num_tls; ++rsc_index) { - accumulated_flags |= worker->iface_attrs[rsc_index].cap.flags; + accumulated_flags |= worker->ifaces[rsc_index].attr.cap.flags; } if (accumulated_flags & UCT_IFACE_FLAG_ATOMIC_DEVICE) { @@ -459,7 +483,7 @@ static ucs_status_t ucp_worker_init_am_mpool(ucp_worker_h worker, size_t max_am_mp_entry_size = 0; for (tl_id = 0; tl_id < worker->context->num_tls; ++tl_id) { - if_attr = &worker->iface_attrs[tl_id]; + if_attr = &worker->ifaces[tl_id].attr; max_am_mp_entry_size = ucs_max(max_am_mp_entry_size, if_attr->cap.am.max_short); max_am_mp_entry_size = ucs_max(max_am_mp_entry_size, @@ -566,25 +590,18 @@ ucs_status_t ucp_worker_create(ucp_context_h context, kh_init_inplace(ucp_worker_ep_hash, &worker->ep_hash); - worker->ifaces = ucs_calloc(context->num_tls, sizeof(*worker->ifaces), + worker->ifaces = ucs_calloc(context->num_tls, sizeof(ucp_worker_iface_t), "ucp iface"); if (worker->ifaces == NULL) { status = UCS_ERR_NO_MEMORY; goto err_free; } - worker->iface_attrs = ucs_calloc(context->num_tls, - sizeof(*worker->iface_attrs), - "ucp iface_attr"); - if (worker->iface_attrs == NULL) { - status = UCS_ERR_NO_MEMORY; - goto err_free_ifaces; - } /* Create statistics */ status = UCS_STATS_NODE_ALLOC(&worker->stats, &ucp_worker_stats_class, ucs_stats_get_root(), "-%p", worker); if (status != UCS_OK) { - goto err_free_attrs; + goto err_free_ifaces; } status = ucp_worker_wakeup_context_init(&worker->wakeup, context->num_tls); @@ -656,8 +673,6 @@ ucs_status_t ucp_worker_create(ucp_context_h context, ucp_worker_wakeup_context_cleanup(&worker->wakeup); err_free_stats: UCS_STATS_NODE_FREE(worker->stats); -err_free_attrs: - ucs_free(worker->iface_attrs); err_free_ifaces: ucs_free(worker->ifaces); err_free: @@ -686,7 +701,6 @@ void ucp_worker_destroy(ucp_worker_h worker) uct_worker_destroy(worker->uct); ucs_async_context_cleanup(&worker->async); ucp_worker_wakeup_context_cleanup(&worker->wakeup); - ucs_free(worker->iface_attrs); ucs_free(worker->ifaces); kh_destroy_inplace(ucp_worker_ep_hash, &worker->ep_hash); UCP_THREAD_LOCK_FINALIZE(&worker->mt_lock); diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index ff48ac64fd5..57d217a03e0 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -13,6 +13,7 @@ #include #include +#include #include KHASH_MAP_INIT_INT64(ucp_worker_ep_hash, ucp_ep_t *); @@ -67,6 +68,17 @@ enum { UCS_STATS_UPDATE_COUNTER((_worker)->stats, \ UCP_WORKER_STAT_TAG_RX_RNDV_##_is_exp, 1); +/** + * UCP worker iface, which encapsulates UCT iface, its attributes and + * some auxiliary info needed for tag matching offloads. + */ +typedef struct ucp_worker_iface { + uct_iface_h iface; + uct_iface_attr_t attr; + ucs_queue_elem_t queue; + ucp_rsc_index_t rsc_index; +} ucp_worker_iface_t; + /** * UCP worker wake-up context. @@ -96,8 +108,7 @@ typedef struct ucp_worker { unsigned stub_pend_count;/* Number of pending requests on stub endpoints*/ khash_t(ucp_worker_ep_hash) ep_hash; /* Hash table of all endpoints */ - uct_iface_h *ifaces; /* Array of interfaces, one for each resource */ - uct_iface_attr_t *iface_attrs; /* Array of interface attributes */ + ucp_worker_iface_t *ifaces; /* Array of interfaces, one for each resource */ ucs_mpool_t am_mp; /* Memory pool for AM receives */ UCS_STATS_NODE_DECLARE(stats); unsigned ep_config_max; /* Maximal number of configurations */ @@ -131,4 +142,10 @@ static inline ucp_ep_h ucp_worker_ep_find(ucp_worker_h worker, uint64_t dest_uui return kh_value(&worker->ep_hash, hash_it); } +static UCS_F_ALWAYS_INLINE +uint64_t ucp_worker_is_tl_tag_offload(ucp_worker_h worker, ucp_rsc_index_t rsc_index) +{ + return 0; /* Stub for now, offload TM proto is not implemented yet */ +} + #endif diff --git a/src/ucp/proto/proto.h b/src/ucp/proto/proto.h index 53536b7b830..21d4408941a 100644 --- a/src/ucp/proto/proto.h +++ b/src/ucp/proto/proto.h @@ -36,7 +36,7 @@ typedef struct { /** * Defines functions for a protocol, on all possible data types. */ -typedef struct ucp_proto { +struct ucp_proto { uct_pending_callback_t contig_short; /**< Progress short data */ uct_pending_callback_t bcopy_single; /**< Progress bcopy single fragment */ uct_pending_callback_t bcopy_multi; /**< Progress bcopy multi-fragment */ @@ -46,7 +46,7 @@ typedef struct ucp_proto { size_t only_hdr_size; /**< Header size for single / short */ size_t first_hdr_size; /**< Header size for first of multi */ size_t mid_hdr_size; /**< Header size for rest of multi */ -} ucp_proto_t; +}; ucs_status_t ucp_proto_progress_am_bcopy_single(uct_pending_req_t *self); diff --git a/src/ucp/proto/proto_am.inl b/src/ucp/proto/proto_am.inl index b291148788d..0d038337e40 100644 --- a/src/ucp/proto/proto_am.inl +++ b/src/ucp/proto/proto_am.inl @@ -153,7 +153,7 @@ ucs_status_t ucp_do_am_zcopy_single(uct_pending_req_t *self, uint8_t am_id, { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; - size_t max_iov = ucp_ep_config(ep)->am.max_iovcnt; + size_t max_iov = ucp_ep_config(ep)->am.max_iov; uct_iov_t *iov = ucs_alloca(max_iov * sizeof(uct_iov_t)); size_t iovcnt = 0; ucp_dt_state_t saved_state; @@ -189,7 +189,7 @@ ucs_status_t ucp_do_am_zcopy_multi(uct_pending_req_t *self, uint8_t am_id_first, ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; const size_t max_middle = ucp_ep_config(ep)->am.max_zcopy - hdr_size_middle; - const size_t max_iov = ucp_ep_config(ep)->am.max_iovcnt; + const size_t max_iov = ucp_ep_config(ep)->am.max_iov; uct_iov_t *iov = ucs_alloca(max_iov * sizeof(uct_iov_t)); ucp_dt_state_t *state = &req->send.state; unsigned flag_iov_mid = 0; diff --git a/src/ucp/rma/basic_rma.c b/src/ucp/rma/basic_rma.c index 01342195ead..946080403eb 100644 --- a/src/ucp/rma/basic_rma.c +++ b/src/ucp/rma/basic_rma.c @@ -404,11 +404,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_fence, (worker), ucp_worker_h worker) UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { - if (worker->ifaces[rsc_index] == NULL) { + if (worker->ifaces[rsc_index].iface == NULL) { continue; } - status = uct_iface_fence(worker->ifaces[rsc_index], 0); + status = uct_iface_fence(worker->ifaces[rsc_index].iface, 0); if (status != UCS_OK) { goto out; } @@ -432,11 +432,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) /* TODO flush in parallel */ for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { - if (worker->ifaces[rsc_index] == NULL) { + if (worker->ifaces[rsc_index].iface == NULL) { continue; } - while (uct_iface_flush(worker->ifaces[rsc_index], 0, NULL) != UCS_OK) { + while (uct_iface_flush(worker->ifaces[rsc_index].iface, 0, NULL) != UCS_OK) { ucp_worker_progress(worker); } } diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 3b865439bdd..a041c033e13 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -225,21 +225,21 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), if (!(ucp_tag_rndv_is_get_op_possible(rndv_req->send.ep, rndv_req->send.rndv_get.rkey_bundle.rkey))) { /* can't perform get_zcopy - switch to AM rndv */ + if (rndv_req->send.rndv_get.rkey_bundle.rkey != UCT_INVALID_RKEY) { + uct_rkey_release(&rndv_req->send.rndv_get.rkey_bundle); + } ucp_rndv_recv_am(rndv_req, rndv_req->send.rndv_get.rreq, rndv_req->send.rndv_get.remote_request, rndv_req->send.length); - if (rndv_req->send.rndv_get.rkey_bundle.rkey != UCT_INVALID_RKEY) { - uct_rkey_release(&rndv_req->send.rndv_get.rkey_bundle); - } return UCS_INPROGRESS; } /* reset the lane to rndv since it might have been set to 0 since it was stub on RTS receive */ rndv_req->send.lane = ucp_ep_get_rndv_get_lane(rndv_req->send.ep); rsc_index = ucp_ep_get_rsc_index(rndv_req->send.ep, rndv_req->send.lane); - align = rndv_req->send.ep->worker->iface_attrs[rsc_index].cap.get.opt_zcopy_align; - ucp_mtu = rndv_req->send.ep->worker->iface_attrs[rsc_index].cap.get.align_mtu; + align = rndv_req->send.ep->worker->ifaces[rsc_index].attr.cap.get.opt_zcopy_align; + ucp_mtu = rndv_req->send.ep->worker->ifaces[rsc_index].attr.cap.get.align_mtu; ucs_trace_data("ep: %p try to progress get_zcopy for rndv get. rndv_req: %p. lane: %d", rndv_req->send.ep, rndv_req, rndv_req->send.lane); @@ -259,7 +259,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_get_zcopy, (self), length = ucp_mtu - ((uintptr_t)rndv_req->send.buffer % align); } else { length = ucs_min(rndv_req->send.length - offset, - ucp_ep_config(rndv_req->send.ep)->rndv.max_get_zcopy); + ucp_ep_config(rndv_req->send.ep)->tag.rndv.max_get_zcopy); } ucs_trace_data("offset %zu remainder %zu. read to %p len %zu", @@ -407,7 +407,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), * with AM messages */ ucp_rndv_handle_recv_am(rndv_req, rreq, rndv_rts_hdr); } - } else if (UCP_DT_IS_GENERIC(rreq->recv.datatype)) { + } else if (UCP_DT_IS_GENERIC(rreq->recv.datatype) || + UCP_DT_IS_IOV(rreq->recv.datatype) ) { /* if the recv side has a generic datatype, * send an RTR and the sender will send the data with AM messages */ ucp_rndv_handle_recv_am(rndv_req, rreq, rndv_rts_hdr); @@ -437,6 +438,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, rndv_rts_hdr->size, recv_flags); if (rreq != NULL) { ucp_rndv_matched(worker, rreq, rndv_rts_hdr); + UCP_WORKER_STAT_RNDV(worker, EXP); status = UCS_OK; } else { diff --git a/src/ucp/tag/tag_match.c b/src/ucp/tag/tag_match.c index 970c8b34a2e..7ebbf7c5b5d 100644 --- a/src/ucp/tag/tag_match.c +++ b/src/ucp/tag/tag_match.c @@ -27,6 +27,8 @@ ucs_status_t ucp_tag_match_init(ucp_tag_match_t *tm) } ucs_queue_head_init(&tm->unexpected); + ucs_queue_head_init(&tm->offload_ifaces); + tm->post_thresh = SIZE_MAX; return UCS_OK; } diff --git a/src/ucp/tag/tag_match.h b/src/ucp/tag/tag_match.h index b637b37f0a5..07a9749d284 100644 --- a/src/ucp/tag/tag_match.h +++ b/src/ucp/tag/tag_match.h @@ -34,6 +34,10 @@ typedef struct ucp_tag_match { uint64_t sn; } expected; ucs_queue_head_t unexpected; /* Unexpected received descriptors */ + + /* Tag offload fields */ + ucs_queue_head_t offload_ifaces; /* Interfaces which support tag offload */ + size_t post_thresh; } ucp_tag_match_t; diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 94f2f96d759..10d01540361 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -45,6 +45,7 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_tag_log_match(recv_tag, rdesc->length - rdesc->hdr_len, req, tag, tag_mask, req->recv.state.offset, "unexpected"); ucs_queue_del_iter(&context->tm.unexpected, iter); + if (rdesc->flags & UCP_RECV_DESC_FLAG_EAGER) { UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0); status = ucp_eager_unexp_match(worker, rdesc, recv_tag, flags, @@ -163,7 +164,9 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t buffer_size, req->recv.tag = tag; req->recv.tag_mask = tag_mask; req->recv.cb = cb; + ucp_tag_exp_add(&worker->context->tm, req); + ucs_trace_req("%s returning expected request %p (%p)", debug_name, req, req + 1); } diff --git a/src/ucp/tag/tag_send.c b/src/ucp/tag/tag_send.c index fb9dfb70932..5990f0fe881 100644 --- a/src/ucp/tag/tag_send.c +++ b/src/ucp/tag/tag_send.c @@ -23,7 +23,7 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, const ucp_proto_t *proto) { ucp_ep_config_t *config = ucp_ep_config(req->send.ep); - ucp_lane_index_t lane = ucp_ep_get_am_lane(req->send.ep); + ucp_lane_index_t lane = config->tag.lane; ucp_worker_h worker = req->send.ep->worker; size_t only_hdr_size = proto->only_hdr_size; unsigned flag_iov_single = 1; @@ -40,11 +40,12 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.state.dt.iov.iovcnt_offset = 0; req->send.state.dt.iov.iov_offset = 0; req->send.state.dt.iov.iovcnt = count; - flag_iov_single = (count <= config->am.max_iovcnt); + flag_iov_single = (count <= config->tag.eager.max_iov) || + config->tag.offload.enabled; if (0 == count) { /* disable zcopy */ zcopy_thresh = SIZE_MAX; - } else if (!config->am.zcopy_auto_thresh) { + } else if (!config->tag.eager.zcopy_auto_thresh) { /* The user defined threshold or no zcopy enabled */ zcopy_thresh = zcopy_thresh_arr[0]; } else if (count <= UCP_MAX_IOV) { @@ -56,7 +57,7 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, zcopy_thresh = ucp_ep_config_get_zcopy_auto_thresh(count, &ucp_ep_md_attr(req->send.ep, lane)->reg_cost, worker->context, - worker->iface_attrs[rsc_index].bandwidth); + worker->ifaces[rsc_index].attr.bandwidth); } } else { length = ucp_contig_dt_length(req->send.datatype, count); @@ -74,15 +75,13 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, /* short */ req->send.uct.func = proto->contig_short; UCS_PROFILE_REQUEST_EVENT(req, "start_contig_short", req->send.length); - } else if ((((config->key.rndv_lane != UCP_NULL_RESOURCE) && - (length >= rndv_rma_thresh)) || - (length >= rndv_am_thresh)) && !is_iov) { + } else if ((length >= rndv_rma_thresh) || (length >= rndv_am_thresh)) { /* RMA/AM rendezvous */ ucp_tag_send_start_rndv(req); UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length); } else if (length < zcopy_thresh) { /* bcopy */ - if (length <= (config->am.max_bcopy - only_hdr_size)) { + if (length <= (config->tag.eager.max_bcopy - only_hdr_size)) { req->send.uct.func = proto->bcopy_single; UCS_PROFILE_REQUEST_EVENT(req, "start_egr_bcopy_single", req->send.length); } else { @@ -99,7 +98,7 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.uct_comp.func = proto->zcopy_completion; req->send.uct_comp.count = 1; - if ((length <= (config->am.max_zcopy - only_hdr_size)) && + if ((length <= (config->tag.eager.max_zcopy - only_hdr_size)) && flag_iov_single) { req->send.uct.func = proto->zcopy_single; UCS_PROFILE_REQUEST_EVENT(req, "start_egr_zcopy_single", req->send.length); @@ -126,14 +125,14 @@ static void ucp_tag_req_start_generic(ucp_request_t *req, size_t count, req->send.state.dt.generic.state = state; req->send.length = length = dt_gen->ops.packed_size(state); - if (length >= rndv_am_thresh) { - /* rendezvous */ - ucp_tag_send_start_rndv(req); - UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length); - } else if (length <= config->am.max_bcopy - proto->only_hdr_size) { + if (length <= config->tag.eager.max_bcopy - proto->only_hdr_size) { /* bcopy single */ req->send.uct.func = proto->bcopy_single; UCS_PROFILE_REQUEST_EVENT(req, "start_gen_bcopy_single", req->send.length); + } else if ((length >= rndv_am_thresh) || config->tag.offload.enabled) { + /* rendezvous */ + ucp_tag_send_start_rndv(req); + UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length); } else { /* bcopy multi */ req->send.uct.func = proto->bcopy_multi; @@ -231,7 +230,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, if (ucs_likely(UCP_DT_IS_CONTIG(datatype))) { length = ucp_contig_dt_length(datatype, count); - if (ucs_likely((ssize_t)length <= ucp_ep_config(ep)->am.max_eager_short)) { + if (ucs_likely((ssize_t)length <= ucp_ep_config(ep)->tag.eager.max_short)) { status = UCS_PROFILE_CALL(ucp_tag_send_eager_short, ep, tag, buffer, length); if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) { @@ -251,11 +250,11 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, ucp_tag_send_req_init(req, ep, buffer, datatype, tag, 0); ret = ucp_tag_send_req(req, count, - ucp_ep_config(ep)->am.max_eager_short, - ucp_ep_config(ep)->am.zcopy_thresh, - ucp_ep_config(ep)->rndv.rma_thresh, - ucp_ep_config(ep)->rndv.am_thresh, - cb, &ucp_tag_eager_proto); + ucp_ep_config(ep)->tag.eager.max_short, + ucp_ep_config(ep)->tag.eager.zcopy_thresh, + ucp_ep_config(ep)->tag.rndv.rma_thresh, + ucp_ep_config(ep)->tag.rndv.am_thresh, + cb, ucp_ep_config(ep)->tag.proto); out: UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); return ret; @@ -292,10 +291,10 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_sync_nb, ret = ucp_tag_send_req(req, count, -1, /* disable short method */ - ucp_ep_config(ep)->am.sync_zcopy_thresh, - ucp_ep_config(ep)->rndv.rma_thresh, - ucp_ep_config(ep)->rndv.am_thresh, - cb, &ucp_tag_eager_sync_proto); + ucp_ep_config(ep)->tag.eager.sync_zcopy_thresh, + ucp_ep_config(ep)->tag.rndv.rma_thresh, + ucp_ep_config(ep)->tag.rndv.am_thresh, + cb, ucp_ep_config(ep)->tag.sync_proto); out: UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); return ret; diff --git a/src/ucp/wireup/address.c b/src/ucp/wireup/address.c index 308ad9f813b..00cdf07bf8f 100644 --- a/src/ucp/wireup/address.c +++ b/src/ucp/wireup/address.c @@ -135,7 +135,7 @@ ucp_address_gather_devices(ucp_worker_h worker, uint64_t tl_bitmap, int has_ep, dev = ucp_address_get_device(context->tl_rscs[i].tl_rsc.dev_name, devices, &num_devices); - iface_attr = &worker->iface_attrs[i]; + iface_attr = &worker->ifaces[i].attr; if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { dev->tl_addrs_size += iface_attr->iface_addr_len; } else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { @@ -324,7 +324,7 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, ++ptr; /* Device address */ - status = uct_iface_get_device_address(worker->ifaces[dev->rsc_index], + status = uct_iface_get_device_address(worker->ifaces[dev->rsc_index].iface, (uct_device_addr_t*)ptr); if (status != UCS_OK) { return status; @@ -345,17 +345,17 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, ptr += sizeof(uint16_t); /* Transport information */ - ucp_address_pack_iface_attr(ptr, &worker->iface_attrs[i], + ucp_address_pack_iface_attr(ptr, &worker->ifaces[i].attr, worker->atomic_tls & UCS_BIT(i)); ucp_address_memchek(ptr, sizeof(ucp_address_packed_iface_attr_t), &context->tl_rscs[dev->rsc_index].tl_rsc); ptr += sizeof(ucp_address_packed_iface_attr_t); /* Transport address length */ - iface_attr = &worker->iface_attrs[i]; + iface_attr = &worker->ifaces[i].attr; if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { tl_addr_len = iface_attr->iface_addr_len; - status = uct_iface_get_address(worker->ifaces[i], + status = uct_iface_get_address(worker->ifaces[i].iface, (uct_iface_addr_t*)(ptr + 1)); } else if (iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { if (ep == NULL) { @@ -390,9 +390,9 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, " md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e ovh %e ", index, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[i].tl_rsc), - md_flags, worker->iface_attrs[i].cap.flags, - worker->iface_attrs[i].bandwidth, - worker->iface_attrs[i].overhead); + md_flags, worker->ifaces[i].attr.cap.flags, + worker->ifaces[i].attr.bandwidth, + worker->ifaces[i].attr.overhead); ++index; } } diff --git a/src/ucp/wireup/address.h b/src/ucp/wireup/address.h index bcd24447cf2..41291520b85 100644 --- a/src/ucp/wireup/address.h +++ b/src/ucp/wireup/address.h @@ -24,6 +24,7 @@ enum { UCT_IFACE_FLAG_PUT_BCOPY | UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_GET_ZCOPY | + UCT_IFACE_FLAG_TAG_EAGER_BCOPY | UCP_UCT_IFACE_ATOMIC32_FLAGS | UCP_UCT_IFACE_ATOMIC64_FLAGS | UCT_IFACE_FLAG_ATOMIC_FADD64 | diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index eef9070fd9e..169458c17cb 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -8,6 +8,7 @@ #include "address.h" #include +#include #include #include #include @@ -18,7 +19,8 @@ enum { UCP_WIREUP_LANE_USAGE_AM = UCS_BIT(0), UCP_WIREUP_LANE_USAGE_RMA = UCS_BIT(1), UCP_WIREUP_LANE_USAGE_AMO = UCS_BIT(2), - UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3) + UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3), + UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4) }; @@ -61,7 +63,11 @@ static const char *ucp_wireup_iface_flags[] = { [ucs_ilog2(UCT_IFACE_FLAG_AM_CB_SYNC)] = "sync am callback", [ucs_ilog2(UCT_IFACE_FLAG_AM_CB_ASYNC)] = "async am callback", [ucs_ilog2(UCT_IFACE_FLAG_WAKEUP)] = "wakeup", - [ucs_ilog2(UCT_IFACE_FLAG_PENDING)] = "pending" + [ucs_ilog2(UCT_IFACE_FLAG_PENDING)] = "pending", + [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_SHORT)] = "tag eager short", + [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)] = "tag eager bcopy", + [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY)] = "tag eager zcopy", + [ucs_ilog2(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY)] = "tag rndv zcopy" }; static double ucp_wireup_aux_score_func(ucp_context_h context, @@ -118,7 +124,8 @@ static int ucp_wireup_is_reachable(ucp_worker_h worker, ucp_rsc_index_t rsc_inde { ucp_context_h context = worker->context; return (context->tl_rscs[rsc_index].tl_name_csum == ae->tl_name_csum) && - uct_iface_is_reachable(worker->ifaces[rsc_index], ae->dev_addr, ae->iface_addr); + uct_iface_is_reachable(worker->ifaces[rsc_index].iface, ae->dev_addr, + ae->iface_addr); } /** @@ -199,7 +206,7 @@ ucp_wireup_select_transport(ucp_ep_h ep, const ucp_address_entry_t *address_list * has a reachable tl on the remote peer */ for (rsc_index = 0; addr_index_map && (rsc_index < context->num_tls); ++rsc_index) { resource = &context->tl_rscs[rsc_index].tl_rsc; - iface_attr = &worker->iface_attrs[rsc_index]; + iface_attr = &worker->ifaces[rsc_index].attr; md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; /* Check that local md and interface satisfy the criteria */ @@ -647,6 +654,49 @@ static ucs_status_t ucp_wireup_add_rndv_lane(ucp_ep_h ep, unsigned address_count return UCS_OK; } +/* Lane for transport offloaded tag interface */ +static ucs_status_t ucp_wireup_add_tag_lane(ucp_ep_h ep, unsigned address_count, + const ucp_address_entry_t *address_list, + ucp_wireup_lane_desc_t *lane_descs, + ucp_lane_index_t *num_lanes_p) +{ + ucp_wireup_criteria_t criteria; + ucp_rsc_index_t rsc_index; + ucs_status_t status; + unsigned addr_index; + double score; + + if (!(ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) || + ucs_queue_is_empty(&ep->worker->context->tm.offload_ifaces)) { + return UCS_OK; + } + + criteria.title = "tag_offload"; + criteria.local_md_flags = UCT_MD_FLAG_REG; /* needed for posting tags to HW */ + criteria.remote_md_flags = UCT_MD_FLAG_REG; /* needed for posting tags to HW */ + criteria.remote_iface_flags = UCT_IFACE_FLAG_TAG_EAGER_BCOPY; + criteria.local_iface_flags = UCT_IFACE_FLAG_TAG_EAGER_BCOPY | + UCT_IFACE_FLAG_PENDING; + + /* Use RMA score func for now (to target mid size messages). + * TODO: have to align to TM_THRESH value. */ + criteria.calc_score = ucp_wireup_rma_score_func; + + if (ucs_test_all_flags(ucp_ep_get_context_features(ep), UCP_FEATURE_WAKEUP)) { + criteria.remote_iface_flags |= UCT_IFACE_FLAG_WAKEUP; + } + + status = ucp_wireup_select_transport(ep, address_list, address_count, &criteria, + -1, -1, 0, &rsc_index, &addr_index, &score); + if (status == UCS_OK) { + ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, + address_list[addr_index].md_index, score, + UCP_WIREUP_LANE_USAGE_TAG); + } + + return UCS_OK; +} + static ucp_lane_index_t ucp_wireup_select_wireup_msg_lane(ucp_worker_h worker, const ucp_address_entry_t *address_list, @@ -668,7 +718,7 @@ ucp_wireup_select_wireup_msg_lane(ucp_worker_h worker, /* if the current lane satisfies the wireup criteria, choose it for wireup. * if it doesn't take a lane with a p2p transport */ if (ucp_wireup_check_flags(resource, - worker->iface_attrs[rsc_index].cap.flags, + worker->ifaces[rsc_index].attr.cap.flags, ucp_wireup_aux_criteria.local_iface_flags, ucp_wireup_aux_criteria.title, ucp_wireup_iface_flags, NULL, 0) && @@ -744,6 +794,12 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, unsigned address_count, return status; } + status = ucp_wireup_add_tag_lane(ep, address_count, address_list, + lane_descs, &key->num_lanes); + if (status != UCS_OK) { + return status; + } + /* User should not create endpoints unless requested communication features */ if (key->num_lanes == 0) { ucs_error("No transports selected to %s (features: 0x%lx)", @@ -776,6 +832,10 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, unsigned address_count, if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_AMO) { key->amo_lanes[lane] = lane; } + if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_TAG) { + ucs_assert(key->tag_lane == UCP_NULL_LANE); + key->tag_lane = lane; + } } /* Sort RMA and AMO lanes according to score */ diff --git a/src/ucp/wireup/stub_ep.c b/src/ucp/wireup/stub_ep.c index 6d9b4135232..b2e1f5917e1 100644 --- a/src/ucp/wireup/stub_ep.c +++ b/src/ucp/wireup/stub_ep.c @@ -249,6 +249,10 @@ static uct_iface_t ucp_stub_iface = { .ep_am_short = (void*)ucp_stub_ep_send_func, .ep_am_bcopy = ucp_stub_ep_am_bcopy, .ep_am_zcopy = (void*)ucp_stub_ep_send_func, + .ep_tag_eager_short = (void*)ucp_stub_ep_send_func, + .ep_tag_eager_bcopy = (void*)ucp_stub_ep_bcopy_send_func, + .ep_tag_eager_zcopy = (void*)ucp_stub_ep_send_func, + .ep_tag_rndv_zcopy = (void*)ucs_empty_function_return_ptr_no_resource, .ep_atomic_add64 = (void*)ucp_stub_ep_send_func, .ep_atomic_fadd64 = (void*)ucp_stub_ep_send_func, .ep_atomic_swap64 = (void*)ucp_stub_ep_send_func, @@ -286,7 +290,7 @@ ucp_stub_ep_connect_aux(ucp_stub_ep_t *stub_ep, unsigned address_count, aux_addr = &address_list[aux_addr_index]; /* create auxiliary endpoint connected to the remote iface. */ - status = uct_ep_create_connected(worker->ifaces[stub_ep->aux_rsc_index], + status = uct_ep_create_connected(worker->ifaces[stub_ep->aux_rsc_index].iface, aux_addr->dev_addr, aux_addr->iface_addr, &stub_ep->aux_ep); if (status != UCS_OK) { @@ -358,7 +362,7 @@ ucs_status_t ucp_stub_ep_connect(uct_ep_h uct_ep, ucp_rsc_index_t rsc_index, ucs_assert(ucp_stub_ep_test(uct_ep)); - status = uct_ep_create(worker->ifaces[rsc_index], &stub_ep->next_ep); + status = uct_ep_create(worker->ifaces[rsc_index].iface, &stub_ep->next_ep); if (status != UCS_OK) { goto err; } diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 138e155c762..a2153d78876 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -341,7 +341,7 @@ static ucs_status_t ucp_wireup_connect_lane(ucp_ep_h ep, ucp_lane_index_t lane, { ucp_worker_h worker = ep->worker; ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(ep, lane); - uct_iface_attr_t *iface_attr = &worker->iface_attrs[rsc_index]; + uct_iface_attr_t *iface_attr = &worker->ifaces[rsc_index].attr; uct_ep_h new_uct_ep; ucs_status_t status; @@ -353,7 +353,7 @@ static ucs_status_t ucp_wireup_connect_lane(ucp_ep_h ep, ucp_lane_index_t lane, ((ep->uct_eps[lane] == NULL) || ucp_stub_ep_test(ep->uct_eps[lane]))) { /* create an endpoint connected to the remote interface */ - status = uct_ep_create_connected(worker->ifaces[rsc_index], + status = uct_ep_create_connected(worker->ifaces[rsc_index].iface, address_list[addr_index].dev_addr, address_list[addr_index].iface_addr, &new_uct_ep); diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index 3197f17d0f3..f0c4d911ae7 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -91,7 +91,7 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, unsigned address_count, static inline int ucp_worker_is_tl_p2p(ucp_worker_h worker, ucp_rsc_index_t rsc_index) { - return !(worker->iface_attrs[rsc_index].cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE); + return !(worker->ifaces[rsc_index].attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE); } diff --git a/src/ucs/sys/sys.c b/src/ucs/sys/sys.c index 6c4fccdf5e7..c35655fe0db 100644 --- a/src/ucs/sys/sys.c +++ b/src/ucs/sys/sys.c @@ -721,6 +721,11 @@ ucs_status_t ucs_empty_function_return_no_resource() return UCS_ERR_NO_RESOURCE; } +ucs_status_ptr_t ucs_empty_function_return_ptr_no_resource() +{ + return UCS_STATUS_PTR(UCS_ERR_NO_RESOURCE); +} + ucs_status_t ucs_empty_function_return_ep_timeout() { return UCS_ERR_ENDPOINT_TIMEOUT; diff --git a/src/ucs/sys/sys.h b/src/ucs/sys/sys.h index 3899304e80e..95512c41c5e 100644 --- a/src/ucs/sys/sys.h +++ b/src/ucs/sys/sys.h @@ -250,6 +250,7 @@ ucs_status_t ucs_empty_function_return_success(); ucs_status_t ucs_empty_function_return_unsupported(); ucs_status_t ucs_empty_function_return_inprogress(); ucs_status_t ucs_empty_function_return_no_resource(); +ucs_status_ptr_t ucs_empty_function_return_ptr_no_resource(); ucs_status_t ucs_empty_function_return_ep_timeout(); ssize_t ucs_empty_function_return_bc_ep_timeout(); ucs_status_t ucs_empty_function_return_busy(); diff --git a/test/gtest/ucp/test_ucp_wireup.cc b/test/gtest/ucp/test_ucp_wireup.cc index 211cc90d0b9..84459c37aef 100644 --- a/test/gtest/ucp/test_ucp_wireup.cc +++ b/test/gtest/ucp/test_ucp_wireup.cc @@ -261,7 +261,7 @@ UCS_TEST_P(test_ucp_wireup, address) { ASSERT_GT(size, 0ul); EXPECT_LE(size, 512ul); /* Expect a reasonable address size */ for (tl = 0; tl < sender().worker()->context->num_tls; tl++) { - packed_dev_priorities.insert(sender().worker()->iface_attrs[tl].priority); + packed_dev_priorities.insert(sender().worker()->ifaces[tl].attr.priority); } char name[UCP_WORKER_NAME_MAX]; From e5411f1d7a5e511a7a2d8258d8c437c7e74e953e Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Thu, 4 May 2017 13:12:51 +0300 Subject: [PATCH 2/4] UCP: tm offload fixes p1 --- src/ucp/core/ucp_ep.c | 3 ++- src/ucp/tag/rndv.c | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 79f376088bb..3b4ebb7bc01 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -773,6 +773,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) config->tag.lane = UCP_NULL_LANE; config->tag.proto = &ucp_tag_eager_proto; config->tag.sync_proto = &ucp_tag_eager_sync_proto; + config->tag.offload.enabled = 0; config->tag.rndv.rma_thresh = SIZE_MAX; config->tag.rndv.max_get_zcopy = SIZE_MAX; config->tag.rndv.am_thresh = SIZE_MAX; @@ -823,7 +824,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) rsc_index = config->key.lanes[lane].rsc_index; if (rsc_index != UCP_NULL_RESOURCE) { iface_attr = &worker->ifaces[rsc_index].attr; - md_attr = &context->tl_mds[rsc_index].attr; + md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; ucp_ep_config_init_attrs(worker, rsc_index, &config->am, iface_attr->cap.am.max_short, iface_attr->cap.am.max_bcopy, diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index a041c033e13..1db6ea2bdc6 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -72,7 +72,8 @@ static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) rndv_rts_hdr->sreq.reqptr = (uintptr_t)sreq; rndv_rts_hdr->sreq.sender_uuid = sreq->send.ep->worker->uuid; rndv_rts_hdr->size = sreq->send.length; - if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { + if (UCP_DT_IS_CONTIG(sreq->send.datatype) || + UCP_DT_IS_IOV(sreq->send.datatype)) { rndv_rts_hdr->address = (uintptr_t) sreq->send.buffer; packed_len += ucp_tag_rndv_pack_rkey(sreq, rndv_rts_hdr); } else if (UCP_DT_IS_GENERIC(sreq->send.datatype)) { From 9e345d65b738ad524b3f1eb7b9ed78745a9c4d3c Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Mon, 22 May 2017 16:37:48 +0300 Subject: [PATCH 3/4] UCP: tm offload fixes p2 --- src/ucp/core/ucp_context.c | 15 ++++++ src/ucp/core/ucp_context.h | 2 + src/ucp/core/ucp_ep.c | 106 +++++++++++++++++++------------------ src/ucp/core/ucp_ep.h | 6 +-- src/ucp/core/ucp_ep.inl | 13 ++++- src/ucp/core/ucp_worker.c | 15 +----- src/ucp/tag/rndv.c | 3 +- src/ucp/tag/tag_send.c | 16 +++--- src/ucp/wireup/address.h | 1 + 9 files changed, 100 insertions(+), 77 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 55653aaa660..80bf3dc7fa2 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -929,3 +929,18 @@ void ucp_context_print_info(ucp_context_h context, FILE *stream) fprintf(stream, "#\n"); } + + +void ucp_context_tag_offload_enable(ucp_context_h context) +{ + if (ucs_queue_length(&context->tm.offload_ifaces) == 1) { + /* Enable offload, because just one tag offload capable interface is present */ + context->tm.post_thresh = ucs_max(context->config.ext.tm_thresh, + sizeof(ucp_request_hdr_t)); + } else { + /* Some offload interface/s already configured. Disable TM receive offload, + * because multiple offload ifaces are not supported yet. */ + context->tm.post_thresh = SIZE_MAX; + } +} + diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 2c12286cb9d..5eb45279db1 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -164,6 +164,8 @@ extern ucp_am_handler_t ucp_am_handlers[]; void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max, const void *data, size_t length); +void ucp_context_tag_offload_enable(ucp_context_h context); + uint64_t ucp_context_uct_atomic_iface_flags(ucp_context_h context); const char * ucp_find_tl_name_by_csum(ucp_context_t *context, uint16_t tl_name_csum); diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 3b4ebb7bc01..4a37a63cd56 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -671,34 +671,34 @@ static void ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, uct_iface_attr_t *iface_attr; uct_md_attr_t *md_attr; - if (lane != UCP_NULL_LANE) { - rsc_index = config->key.lanes[lane].rsc_index; - if (rsc_index != UCP_NULL_RESOURCE) { - iface_attr = &worker->ifaces[rsc_index].attr; - md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; - ucs_assert_always(iface_attr->cap.flags & rndv_cap_flag); - - if (context->config.ext.rndv_thresh == UCS_CONFIG_MEMUNITS_AUTO) { - /* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/ - rndv_thresh = ucp_ep_config_calc_rndv_thresh(context, iface_attr, - md_attr, SIZE_MAX, 1); - } else { - /* In order to disable rendezvous, need to set the threshold to - * infinite (-1). - */ - rndv_thresh = context->config.ext.rndv_thresh; - } + if (lane == UCP_NULL_LANE) { + ucs_debug("rendezvous (get_zcopy) protocol is not supported"); + return; + } - /* use rendezvous only starting from minimal zero-copy get size */ - ucs_assert(iface_attr->cap.get.min_zcopy <= iface_attr->cap.get.max_zcopy); - rndv_thresh = ucs_max(rndv_thresh, iface_attr->cap.get.min_zcopy); + rsc_index = config->key.lanes[lane].rsc_index; + if (rsc_index == UCP_NULL_RESOURCE) { + return; + } - config->tag.rndv.max_get_zcopy = iface_attr->cap.get.max_zcopy; - config->tag.rndv.rma_thresh = ucs_min(rndv_thresh, adjust_min_val); - } else { - ucs_debug("rendezvous (get_zcopy) protocol is not supported "); - } + iface_attr = &worker->ifaces[rsc_index].attr; + md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; + ucs_assert_always(iface_attr->cap.flags & rndv_cap_flag); + + if (context->config.ext.rndv_thresh == UCS_CONFIG_MEMUNITS_AUTO) { + /* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/ + rndv_thresh = ucp_ep_config_calc_rndv_thresh(context, iface_attr, + md_attr, SIZE_MAX, 1); + } else { + rndv_thresh = context->config.ext.rndv_thresh; } + + /* use rendezvous only starting from minimal zero-copy get size */ + ucs_assert(iface_attr->cap.get.min_zcopy <= iface_attr->cap.get.max_zcopy); + rndv_thresh = ucs_max(rndv_thresh, iface_attr->cap.get.min_zcopy); + + config->tag.rndv.max_get_zcopy = iface_attr->cap.get.max_zcopy; + config->tag.rndv.rma_thresh = ucs_min(rndv_thresh, adjust_min_val); } static void ucp_ep_config_init_attrs(ucp_worker_t *worker, ucp_rsc_index_t rsc_index, @@ -714,7 +714,6 @@ static void ucp_ep_config_init_attrs(ucp_worker_t *worker, ucp_rsc_index_t rsc_i size_t it; size_t zcopy_thresh; - if (iface_attr->cap.flags & short_flag) { config->max_short = max_short - hdr_len; } else { @@ -725,26 +724,28 @@ static void ucp_ep_config_init_attrs(ucp_worker_t *worker, ucp_rsc_index_t rsc_i config->max_bcopy = max_bcopy; } - if ((iface_attr->cap.flags & zcopy_flag) && (md_attr->cap.flags & UCT_MD_FLAG_REG)) { - config->max_zcopy = max_zcopy; - config->max_iov = ucs_min(UCP_MAX_IOV, max_iov); - - if (context->config.ext.zcopy_thresh == UCS_CONFIG_MEMUNITS_AUTO) { - config->zcopy_auto_thresh = 1; - for (it = 0; it < UCP_MAX_IOV; ++it) { - zcopy_thresh = ucp_ep_config_get_zcopy_auto_thresh(it + 1, - &md_attr->reg_cost, - context, - iface_attr->bandwidth); - zcopy_thresh = ucs_min(zcopy_thresh, adjust_min_val); - config->sync_zcopy_thresh[it] = zcopy_thresh; - config->zcopy_thresh[it] = zcopy_thresh; - } - } else { - config->zcopy_auto_thresh = 0; - config->sync_zcopy_thresh[0] = config->zcopy_thresh[0] = - ucs_min(context->config.ext.zcopy_thresh, adjust_min_val); + if (!((iface_attr->cap.flags & zcopy_flag) && (md_attr->cap.flags & UCT_MD_FLAG_REG))) { + return; + } + + config->max_zcopy = max_zcopy; + config->max_iov = ucs_min(UCP_MAX_IOV, max_iov); + + if (context->config.ext.zcopy_thresh == UCS_CONFIG_MEMUNITS_AUTO) { + config->zcopy_auto_thresh = 1; + for (it = 0; it < UCP_MAX_IOV; ++it) { + zcopy_thresh = ucp_ep_config_get_zcopy_auto_thresh(it + 1, + &md_attr->reg_cost, + context, + iface_attr->bandwidth); + zcopy_thresh = ucs_min(zcopy_thresh, adjust_min_val); + config->sync_zcopy_thresh[it] = zcopy_thresh; + config->zcopy_thresh[it] = zcopy_thresh; } + } else { + config->zcopy_auto_thresh = 0; + config->sync_zcopy_thresh[0] = config->zcopy_thresh[0] = + ucs_min(context->config.ext.zcopy_thresh, adjust_min_val); } } @@ -758,6 +759,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) ucp_lane_index_t lane; size_t it; size_t max_rndv_thresh; + size_t max_am_rndv_thresh; /* Default settings */ for (it = 0; it < UCP_MAX_IOV; ++it) { @@ -773,11 +775,11 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) config->tag.lane = UCP_NULL_LANE; config->tag.proto = &ucp_tag_eager_proto; config->tag.sync_proto = &ucp_tag_eager_sync_proto; - config->tag.offload.enabled = 0; config->tag.rndv.rma_thresh = SIZE_MAX; config->tag.rndv.max_get_zcopy = SIZE_MAX; config->tag.rndv.am_thresh = SIZE_MAX; max_rndv_thresh = SIZE_MAX; + max_am_rndv_thresh = SIZE_MAX; /* Collect p2p lanes */ for (lane = 0; lane < config->key.num_lanes; ++lane) { @@ -806,12 +808,12 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) iface_attr->cap.tag.eager.max_bcopy); config->tag.offload.max_recv_iov = iface_attr->cap.tag.recv.max_iov; - config->tag.rndv.max_iov = iface_attr->cap.tag.rndv.max_iov; + config->tag.offload.max_rndv_iov = iface_attr->cap.tag.rndv.max_iov; config->tag.sync_proto = NULL; config->tag.proto = NULL; config->tag.lane = lane; - config->tag.offload.enabled = 1; max_rndv_thresh = iface_attr->cap.tag.eager.max_zcopy; + max_am_rndv_thresh = iface_attr->cap.tag.eager.max_bcopy; ucp_ep_config_set_rndv_thresh(worker, config, lane, UCT_IFACE_FLAG_TAG_RNDV_ZCOPY, @@ -838,9 +840,9 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) /* Calculate rndv threshold for AM Rendezvous, which may be used by * any tag-matching protocol (AM and offload). */ ucp_ep_config_set_am_rndv_thresh(context, iface_attr, md_attr, config, - max_rndv_thresh); + max_am_rndv_thresh); - if (!config->tag.offload.enabled) { + if (!ucp_ep_is_tag_offload_enabled(config)) { /* Tag offload is disabled, AM will be used for all * tag-matching protocols */ ucp_ep_config_set_rndv_thresh(worker, config, config->key.rndv_lane, @@ -1054,8 +1056,8 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker, fprintf(stream, "#\n"); if (context->config.features & UCP_FEATURE_TAG) { - tag_config = (config->tag.offload.enabled) ? &config->tag.eager : - &config->am; + tag_config = (ucp_ep_is_tag_offload_enabled((ucp_ep_config_t *)config)) ? + &config->tag.eager : &config->am; ucp_ep_config_print_tag_proto(stream, "tag_send", tag_config->max_short, tag_config->zcopy_thresh[0], diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 433f77c6937..787c6f0da38 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -155,13 +155,13 @@ typedef struct ucp_ep_config { size_t rma_thresh; /* Threshold for switching from eager to AM based rendezvous */ size_t am_thresh; - /* Maximal iov count for RNDV offload */ - size_t max_iov; } rndv; struct { + /* Maximal iov count for tag recv offload */ size_t max_recv_iov; - unsigned enabled; + /* Maximal iov count for RNDV offload */ + size_t max_rndv_iov; } offload; } tag; diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index b9d3b198112..cf3e24a40f9 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -40,7 +40,7 @@ static inline ucp_lane_index_t ucp_ep_get_rndv_get_lane(ucp_ep_h ep) static inline ucp_lane_index_t ucp_ep_get_tag_lane(ucp_ep_h ep) { - ucs_assert(ucp_ep_config(ep)->key.tag_lane != UCP_NULL_RESOURCE); + ucs_assert(ucp_ep_config(ep)->key.tag_lane != UCP_NULL_LANE); return ucp_ep_config(ep)->key.tag_lane; } @@ -49,6 +49,17 @@ static inline int ucp_ep_is_rndv_lane_present(ucp_ep_h ep) return ucp_ep_config(ep)->key.rndv_lane != UCP_NULL_RESOURCE; } +static inline int ucp_ep_is_tag_offload_enabled(ucp_ep_config_t *config) +{ + ucp_lane_index_t lane = config->key.tag_lane; + + if (lane != UCP_NULL_LANE) { + ucs_assert(config->key.lanes[lane].rsc_index != UCP_NULL_RESOURCE); + return 1; + } + return 0; +} + static inline uct_ep_h ucp_ep_get_am_uct_ep(ucp_ep_h ep) { return ep->uct_eps[ucp_ep_get_am_lane(ep)]; diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index d69e439e6b2..1937dbf2b15 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -56,11 +56,7 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker) if (ucp_worker_is_tl_tag_offload(worker, rsc_index)) { ucs_queue_remove(&worker->context->tm.offload_ifaces, &worker->ifaces[rsc_index].queue); - if (ucs_queue_length(&worker->context->tm.offload_ifaces) == 1) { - /* Enable offload, because just one tag offload capable interface left */ - worker->context->tm.post_thresh = ucs_max(worker->context->config.ext.tm_thresh, - sizeof(ucp_request_hdr_t)); - } + ucp_context_tag_offload_enable(worker->context); } uct_iface_close(worker->ifaces[rsc_index].iface); @@ -312,16 +308,9 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker, } if (ucp_worker_is_tl_tag_offload(worker, tl_id)) { - if (ucs_queue_is_empty(&context->tm.offload_ifaces)) { - context->tm.post_thresh = ucs_max(context->config.ext.tm_thresh, - sizeof(ucp_request_hdr_t)); - } else { - /* Some offload interface/s already configured. Disable TM receive offload, - * because multiple offload ifaces are not supported yet. */ - context->tm.post_thresh = SIZE_MAX; - } worker->ifaces[tl_id].rsc_index = tl_id; ucs_queue_push(&context->tm.offload_ifaces, &worker->ifaces[tl_id].queue); + ucp_context_tag_offload_enable(context); } ucs_debug("created interface[%d] using "UCT_TL_RESOURCE_DESC_FMT" on worker %p", diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 6df915dfb7a..4b02dcec1f9 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -72,8 +72,7 @@ static size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) rndv_rts_hdr->sreq.reqptr = (uintptr_t)sreq; rndv_rts_hdr->sreq.sender_uuid = sreq->send.ep->worker->uuid; rndv_rts_hdr->size = sreq->send.length; - if (UCP_DT_IS_CONTIG(sreq->send.datatype) || - UCP_DT_IS_IOV(sreq->send.datatype)) { + if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { rndv_rts_hdr->address = (uintptr_t) sreq->send.buffer; packed_len += ucp_tag_rndv_pack_rkey(sreq, rndv_rts_hdr); } else if (UCP_DT_IS_GENERIC(sreq->send.datatype) || diff --git a/src/ucp/tag/tag_send.c b/src/ucp/tag/tag_send.c index 5990f0fe881..b5b3d0aab07 100644 --- a/src/ucp/tag/tag_send.c +++ b/src/ucp/tag/tag_send.c @@ -27,6 +27,7 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, ucp_worker_h worker = req->send.ep->worker; size_t only_hdr_size = proto->only_hdr_size; unsigned flag_iov_single = 1; + size_t rndv_thresh = rndv_rma_thresh; ucp_rsc_index_t rsc_index; unsigned is_iov; size_t zcopy_thresh; @@ -40,8 +41,11 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.state.dt.iov.iovcnt_offset = 0; req->send.state.dt.iov.iov_offset = 0; req->send.state.dt.iov.iovcnt = count; - flag_iov_single = (count <= config->tag.eager.max_iov) || - config->tag.offload.enabled; + flag_iov_single = (count <= config->tag.eager.max_iov); + if ((!flag_iov_single) && ucp_ep_is_tag_offload_enabled(config)) { + rndv_thresh = length; /* make sure SW RNDV will be used */ + } + if (0 == count) { /* disable zcopy */ zcopy_thresh = SIZE_MAX; @@ -66,16 +70,16 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.length = length; ucs_trace_req("select request(%p) progress algorithm datatype=%lx buffer=%p " - " length=%zu max_short=%zd rndv_rma_thresh=%zu ndv_rma_thresh=%zu " + " length=%zu max_short=%zd rndv_rma_thresh=%zu(%zu) rndv_am_thresh=%zu " "zcopy_thresh=%zu", req, req->send.datatype, req->send.buffer, length, max_short, - rndv_rma_thresh, rndv_am_thresh, zcopy_thresh); + rndv_rma_thresh, rndv_thresh, rndv_am_thresh, zcopy_thresh); if (((ssize_t)length <= max_short) && !is_iov) { /* short */ req->send.uct.func = proto->contig_short; UCS_PROFILE_REQUEST_EVENT(req, "start_contig_short", req->send.length); - } else if ((length >= rndv_rma_thresh) || (length >= rndv_am_thresh)) { + } else if ((length >= rndv_thresh) || (length >= rndv_am_thresh)) { /* RMA/AM rendezvous */ ucp_tag_send_start_rndv(req); UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length); @@ -129,7 +133,7 @@ static void ucp_tag_req_start_generic(ucp_request_t *req, size_t count, /* bcopy single */ req->send.uct.func = proto->bcopy_single; UCS_PROFILE_REQUEST_EVENT(req, "start_gen_bcopy_single", req->send.length); - } else if ((length >= rndv_am_thresh) || config->tag.offload.enabled) { + } else if (length >= rndv_am_thresh) { /* rendezvous */ ucp_tag_send_start_rndv(req); UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length); diff --git a/src/ucp/wireup/address.h b/src/ucp/wireup/address.h index 41291520b85..1b6061eeda5 100644 --- a/src/ucp/wireup/address.h +++ b/src/ucp/wireup/address.h @@ -25,6 +25,7 @@ enum { UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_GET_ZCOPY | UCT_IFACE_FLAG_TAG_EAGER_BCOPY | + UCT_IFACE_FLAG_TAG_RNDV_ZCOPY | UCP_UCT_IFACE_ATOMIC32_FLAGS | UCP_UCT_IFACE_ATOMIC64_FLAGS | UCT_IFACE_FLAG_ATOMIC_FADD64 | From 4f63ef619b0ea649304ad4268b00c6587b4308dd Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Tue, 23 May 2017 11:15:56 +0300 Subject: [PATCH 4/4] UCP: tm offload fixes p3 --- src/ucp/tag/tag_send.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/ucp/tag/tag_send.c b/src/ucp/tag/tag_send.c index b5b3d0aab07..714d8bead50 100644 --- a/src/ucp/tag/tag_send.c +++ b/src/ucp/tag/tag_send.c @@ -27,7 +27,7 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, ucp_worker_h worker = req->send.ep->worker; size_t only_hdr_size = proto->only_hdr_size; unsigned flag_iov_single = 1; - size_t rndv_thresh = rndv_rma_thresh; + unsigned force_sw_rndv = 0; ucp_rsc_index_t rsc_index; unsigned is_iov; size_t zcopy_thresh; @@ -42,8 +42,11 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.state.dt.iov.iov_offset = 0; req->send.state.dt.iov.iovcnt = count; flag_iov_single = (count <= config->tag.eager.max_iov); - if ((!flag_iov_single) && ucp_ep_is_tag_offload_enabled(config)) { - rndv_thresh = length; /* make sure SW RNDV will be used */ + + if (!flag_iov_single && ucp_ep_is_tag_offload_enabled(config)) { + /* Make sure SW RNDV will be used, becasuse tag offload does + * not support multi-packet eager protocols. */ + force_sw_rndv = 1; } if (0 == count) { @@ -70,16 +73,17 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.length = length; ucs_trace_req("select request(%p) progress algorithm datatype=%lx buffer=%p " - " length=%zu max_short=%zd rndv_rma_thresh=%zu(%zu) rndv_am_thresh=%zu " + " length=%zu max_short=%zd rndv_rma_thresh=%zu rndv_am_thresh=%zu " "zcopy_thresh=%zu", req, req->send.datatype, req->send.buffer, length, max_short, - rndv_rma_thresh, rndv_thresh, rndv_am_thresh, zcopy_thresh); + rndv_rma_thresh, rndv_am_thresh, zcopy_thresh); if (((ssize_t)length <= max_short) && !is_iov) { /* short */ req->send.uct.func = proto->contig_short; UCS_PROFILE_REQUEST_EVENT(req, "start_contig_short", req->send.length); - } else if ((length >= rndv_thresh) || (length >= rndv_am_thresh)) { + } else if ((length >= rndv_rma_thresh) || (length >= rndv_am_thresh) || + force_sw_rndv) { /* RMA/AM rendezvous */ ucp_tag_send_start_rndv(req); UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length);