From 0aad425a46041bd2c523132347657f0eec77b9c1 Mon Sep 17 00:00:00 2001 From: Mikhail Brinskii Date: Fri, 1 Dec 2017 13:30:20 +0200 Subject: [PATCH 1/2] UCP/TAG: Move TM stuff from context to worker Tag Matching queues and other related info are moved to the worker object. Now tag communications should not be crossed between different workers (like communicators in MPI). And ucp_tag_msg_recv_nb* behavior matches their description, that these routines receive a message on the particular worker. --- src/ucp/core/ucp_context.c | 32 --------- src/ucp/core/ucp_context.h | 3 - src/ucp/core/ucp_request.c | 4 +- src/ucp/core/ucp_request.inl | 2 +- src/ucp/core/ucp_worker.c | 42 ++++++++++-- src/ucp/core/ucp_worker.h | 2 + src/ucp/tag/eager_rcv.c | 13 ++-- src/ucp/tag/offload.c | 97 ++++++++++++++-------------- src/ucp/tag/offload.h | 16 ++--- src/ucp/tag/probe.c | 10 ++- src/ucp/tag/rndv.c | 21 +++--- src/ucp/tag/tag_match.c | 3 +- src/ucp/tag/tag_recv.c | 25 +++---- src/ucp/wireup/select.c | 2 +- test/gtest/ucp/test_ucp_tag.cc | 6 +- test/gtest/ucp/test_ucp_tag.h | 2 +- test/gtest/ucp/test_ucp_tag_probe.cc | 2 +- test/gtest/ucp/test_ucp_tag_xfer.cc | 2 +- 18 files changed, 135 insertions(+), 149 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index acf4654d4ad..2c05e7220f4 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -922,20 +922,12 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver goto err_free_config; } - /* initialize tag matching */ - status = ucp_tag_match_init(&context->tm); - if (status != UCS_OK) { - goto err_free_resources; - } - ucs_debug("created ucp context %p [%d mds %d tls] features 0x%lx", context, context->num_mds, context->num_tls, context->config.features); *context_p = context; return UCS_OK; -err_free_resources: - ucp_free_resources(context); err_free_config: ucp_free_config(context); err_free_ctx: @@ -946,7 +938,6 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver void ucp_cleanup(ucp_context_h context) { - ucp_tag_match_cleanup(&context->tm); ucp_free_resources(context); ucp_free_config(context); UCP_THREAD_LOCK_FINALIZE(&context->mt_lock); @@ -1027,26 +1018,3 @@ void ucp_context_print_info(ucp_context_h context, FILE *stream) fprintf(stream, "#\n"); } -void ucp_context_tag_offload_enable(ucp_context_h context) -{ - ucp_worker_iface_t *offload_iface; - - /* Enable offload, if only one tag offload capable interface is present - * (multiple offload ifaces are not supported yet). */ - if (context->config.ext.tm_offload && - (ucs_queue_length(&context->tm.offload.ifaces) == 1)) { - context->tm.offload.thresh = context->config.ext.tm_thresh; - context->tm.offload.zcopy_thresh = context->config.ext.tm_max_bcopy; - - offload_iface = ucs_queue_head_elem_non_empty(&context->tm.offload.ifaces, - ucp_worker_iface_t, queue); - ucp_worker_iface_activate(offload_iface, 0); - - ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu", - context->tm.offload.thresh, context->tm.offload.zcopy_thresh); - } else { - context->tm.offload.thresh = SIZE_MAX; - ucs_debug("Disable TM offload, multiple offload ifaces are not supported"); - } -} - diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 311bde85d2b..f09c1470b7d 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -13,7 +13,6 @@ #include "ucp_thread.h" #include -#include #include #include #include @@ -126,8 +125,6 @@ typedef struct ucp_context { ucp_tl_resource_desc_t *tl_rscs; /* Array of communication resources */ ucp_rsc_index_t num_tls; /* Number of resources in the array*/ - ucp_tag_match_t tm; /* Tag-matching queues and offload info */ - struct { /* Bitmap of features supported by the context */ diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index 24b627df361..4c3bcdb4cf1 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -107,15 +107,13 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request), if (req->flags & UCP_REQUEST_FLAG_EXPECTED) { UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); - ucp_tag_exp_remove(&worker->context->tm, req); + ucp_tag_exp_remove(&worker->tm, req); /* If tag posted to the transport need to wait its completion */ if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) { ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); } } diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index fe0449dcde5..e839d57eb87 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -91,7 +91,7 @@ ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status) ucs_status_string(status)); UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status); if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) { - --req->recv.worker->context->tm.offload.sw_req_count; + --req->recv.worker->tm.offload.sw_req_count; } ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info); } diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 32cac7c74f4..9a247644954 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -612,6 +612,30 @@ void ucp_worker_iface_event(int fd, void *arg) ucp_worker_signal_internal(worker); } +static void ucp_worker_tag_offload_enable(ucp_worker_t *worker) +{ + ucp_context_t *context = worker->context; + ucp_worker_iface_t *offload_iface; + + /* Enable offload, if only one tag offload capable interface is present + * (multiple offload ifaces are not supported yet). */ + if (context->config.ext.tm_offload && + (ucs_queue_length(&worker->tm.offload.ifaces) == 1)) { + worker->tm.offload.thresh = context->config.ext.tm_thresh; + worker->tm.offload.zcopy_thresh = context->config.ext.tm_max_bcopy; + + offload_iface = ucs_queue_head_elem_non_empty(&worker->tm.offload.ifaces, + ucp_worker_iface_t, queue); + ucp_worker_iface_activate(offload_iface, 0); + + ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu", + worker->tm.offload.thresh, worker->tm.offload.zcopy_thresh); + } else { + worker->tm.offload.thresh = SIZE_MAX; + ucs_debug("Disable TM offload, multiple offload ifaces are not supported"); + } +} + static void ucp_worker_close_ifaces(ucp_worker_h worker) { ucp_rsc_index_t rsc_index; @@ -643,8 +667,8 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker) } if (ucp_worker_is_tl_tag_offload(worker, rsc_index)) { - ucs_queue_remove(&worker->context->tm.offload.ifaces, &wiface->queue); - ucp_context_tag_offload_enable(worker->context); + ucs_queue_remove(&worker->tm.offload.ifaces, &wiface->queue); + ucp_worker_tag_offload_enable(worker); } uct_iface_close(wiface->iface); @@ -753,8 +777,8 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id, if (ucp_worker_is_tl_tag_offload(worker, tl_id)) { worker->ifaces[tl_id].rsc_index = tl_id; - ucs_queue_push(&context->tm.offload.ifaces, &wiface->queue); - ucp_context_tag_offload_enable(context); + ucs_queue_push(&worker->tm.offload.ifaces, &wiface->queue); + ucp_worker_tag_offload_enable(worker); } return UCS_OK; @@ -1123,6 +1147,13 @@ ucs_status_t ucp_worker_create(ucp_context_h context, cpu_mask = &empty_cpu_mask; } + /* Initialize tag matching */ + status = ucp_tag_match_init(&worker->tm); + if (status != UCS_OK) { + goto err_wakeup_cleanup; + } + + /* Open all resources as interfaces on this worker */ for (tl_id = 0; tl_id < context->num_tls; ++tl_id) { status = ucp_worker_add_iface(worker, tl_id, rx_headroom, cpu_mask); @@ -1145,6 +1176,8 @@ ucs_status_t ucp_worker_create(ucp_context_h context, err_close_ifaces: ucp_worker_close_ifaces(worker); + ucp_tag_match_cleanup(&worker->tm); +err_wakeup_cleanup: ucp_worker_wakeup_cleanup(worker); err_rndv_lanes_mp_cleanup: ucs_mpool_cleanup(&worker->rndv_get_mp, 1); @@ -1184,6 +1217,7 @@ void ucp_worker_destroy(ucp_worker_h worker) ucs_mpool_cleanup(&worker->reg_mp, 1); ucs_mpool_cleanup(&worker->rndv_get_mp, 1); ucp_worker_close_ifaces(worker); + ucp_tag_match_cleanup(&worker->tm); ucp_worker_wakeup_cleanup(worker); ucs_mpool_cleanup(&worker->req_mp, 1); uct_worker_destroy(worker->uct); diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index ac85cb304b3..c041155e861 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -11,6 +11,7 @@ #include "ucp_ep.h" #include "ucp_thread.h" +#include #include #include #include @@ -172,6 +173,7 @@ typedef struct ucp_worker { ucs_mpool_t am_mp; /* Memory pool for AM receives */ ucs_mpool_t reg_mp; /* Registered memory pool */ ucp_mt_lock_t mt_lock; /* Configuration of multi-threading support */ + ucp_tag_match_t tm; /* Tag-matching queues and offload info */ UCS_STATS_NODE_DECLARE(stats); UCS_STATS_NODE_DECLARE(tm_offload_stats); diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index 01fae57927f..572579e285c 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -55,19 +55,18 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, ucp_worker_h worker = arg; ucp_eager_hdr_t *eager_hdr = data; ucp_eager_first_hdr_t *eager_first_hdr = data; - ucp_context_h context = worker->context; ucp_request_t *req; ucs_status_t status; size_t recv_len; ucp_tag_t recv_tag; - UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock); + UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); ucs_assert(length >= hdr_len); recv_tag = eager_hdr->super.tag; recv_len = length - hdr_len; - req = ucp_tag_exp_search(&context->tm, recv_tag, recv_len, flags); + req = ucp_tag_exp_search(&worker->tm, recv_tag, recv_len, flags); if (req != NULL) { UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len); @@ -85,7 +84,7 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, * because it arrived either: * 1) via SW TM (e. g. peer doesn't support offload) * 2) as unexpected via HW TM */ - ucp_tag_offload_try_cancel(context, req, 1); + ucp_tag_offload_try_cancel(worker, req, 1); if (flags & UCP_RECV_DESC_FLAG_LAST) { req->recv.tag.info.length = recv_len; @@ -111,11 +110,11 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, status = UCS_OK; } else { - status = ucp_tag_unexp_recv(&context->tm, worker, data, length, am_flags, + status = ucp_tag_unexp_recv(&worker->tm, worker, data, length, am_flags, hdr_len, flags); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); + UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return status; } @@ -182,7 +181,7 @@ static ucs_status_t ucp_eager_offload_sync_ack_handler(void *arg, void *data, { ucp_offload_ssend_hdr_t *rep_hdr = data; ucp_worker_t *worker = arg; - ucs_queue_head_t *queue = &worker->context->tm.offload.sync_reqs; + ucs_queue_head_t *queue = &worker->tm.offload.sync_reqs; ucp_request_t *sreq; ucs_queue_iter_t iter; diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index f50a1fd2a45..9523ed9b88e 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -18,25 +18,25 @@ static UCS_F_ALWAYS_INLINE ucp_worker_iface_t* -ucp_tag_offload_iface(ucp_context_t *ctx) +ucp_tag_offload_iface(ucp_worker_t *worker) { - ucs_assert(!ucs_queue_is_empty(&ctx->tm.offload.ifaces)); + ucs_assert(!ucs_queue_is_empty(&worker->tm.offload.ifaces)); - return ucs_queue_head_elem_non_empty(&ctx->tm.offload.ifaces, + return ucs_queue_head_elem_non_empty(&worker->tm.offload.ifaces, ucp_worker_iface_t, queue); } static UCS_F_ALWAYS_INLINE void -ucp_tag_offload_release_buf(ucp_request_t *req, ucp_context_t *ctx) +ucp_tag_offload_release_buf(ucp_request_t *req, ucp_worker_t *worker) { ucp_worker_iface_t *iface; if (req->recv.rdesc != NULL) { ucs_mpool_put_inline(req->recv.rdesc); } else { - iface = ucp_tag_offload_iface(ctx); - ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype, - &req->recv.state); + iface = ucp_tag_offload_iface(worker); + ucp_request_memory_dereg(worker->context, iface->rsc_index, + req->recv.datatype, &req->recv.state); } } @@ -44,10 +44,9 @@ ucp_tag_offload_release_buf(ucp_request_t *req, ucp_context_t *ctx) void ucp_tag_offload_tag_consumed(uct_tag_context_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); - ucp_context_t *ctx = req->recv.worker->context; ucs_queue_head_t *queue; - queue = ucp_tag_exp_get_req_queue(&ctx->tm, req); + queue = ucp_tag_exp_get_req_queue(&req->recv.worker->tm, req); ucs_queue_remove(queue, &req->recv.queue); } @@ -55,15 +54,15 @@ void ucp_tag_offload_tag_consumed(uct_tag_context_t *self) void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, uint64_t imm, size_t length, ucs_status_t status) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); - ucp_context_t *ctx = req->recv.worker->context; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); + ucp_worker_t *worker = req->recv.worker; ucp_worker_iface_t *iface; req->recv.tag.info.sender_tag = stag; req->recv.tag.info.length = length; if (ucs_unlikely(status != UCS_OK)) { - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); goto out; } @@ -79,9 +78,9 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, UCP_RECV_DESC_FLAG_LAST); ucs_mpool_put_inline(req->recv.rdesc); } else { - iface = ucp_tag_offload_iface(ctx); - ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype, - &req->recv.state); + iface = ucp_tag_offload_iface(worker); + ucp_request_memory_dereg(worker->context, iface->rsc_index, + req->recv.datatype, &req->recv.state); } UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, MATCHED); @@ -90,13 +89,14 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, } static UCS_F_ALWAYS_INLINE size_t -ucp_tag_offload_rkey_size(ucp_context_t *ctx) +ucp_tag_offload_rkey_size(ucp_worker_t *worker) { - ucp_worker_iface_t *iface = ucp_tag_offload_iface(ctx); + ucp_worker_iface_t *iface = ucp_tag_offload_iface(worker); + ucp_context_t *context = worker->context; ucp_md_index_t md_idx; - md_idx = ctx->tl_rscs[iface->rsc_index].md_index; - return ctx->tl_mds[md_idx].attr.rkey_packed_size; + md_idx = context->tl_rscs[iface->rsc_index].md_index; + return context->tl_mds[md_idx].attr.rkey_packed_size; } static UCS_F_ALWAYS_INLINE size_t @@ -159,28 +159,28 @@ void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag, ucs_status_t status) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); - ucp_context_t *ctx = req->recv.worker->context; + ucp_worker_t *worker = req->recv.worker; ucp_sw_rndv_hdr_t *sw_hdr = (ucp_sw_rndv_hdr_t*)header; ucp_rndv_rts_hdr_t *rts; size_t rkey_size; - UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, MATCHED_SW_RNDV); + UCP_WORKER_STAT_TAG_OFFLOAD(worker, MATCHED_SW_RNDV); if (ucs_unlikely(status != UCS_OK)) { - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); ucp_request_complete_tag_recv(req, status); return; } rkey_size = (sw_hdr->flags & UCP_RNDV_RTS_FLAG_PACKED_RKEY) ? - ucp_tag_offload_rkey_size(ctx) : 0; + ucp_tag_offload_rkey_size(worker) : 0; rts = alloca(sizeof(*rts) + rkey_size); ucp_tag_offload_fill_sw_rts(rts, sw_hdr, header_length, stag, rkey_size); ucp_rndv_matched(req->recv.worker, req, rts); - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); } UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, @@ -195,7 +195,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, size_t len; size_t rkey_size; - rkey_size = ucp_tag_offload_rkey_size(worker->context); + rkey_size = ucp_tag_offload_rkey_size(worker); rts = ucs_alloca(sizeof(*rts) + rkey_size); /* SW rndv req may also carry a key */ if (remote_addr) { @@ -220,9 +220,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, return UCS_OK; } -void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) +void ucp_tag_offload_cancel(ucp_worker_t *worker, ucp_request_t *req, int force) { - ucp_worker_iface_t *ucp_iface = ucp_tag_offload_iface(ctx); + + ucp_worker_iface_t *ucp_iface = ucp_tag_offload_iface(worker); ucs_status_t status; status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, @@ -232,19 +233,20 @@ void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) ucs_status_string(status)); return; } - UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, CANCELED); + UCP_WORKER_STAT_TAG_OFFLOAD(worker, CANCELED); /* if cancel is not forced, need to wait its completion */ if (force) { - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); } } -int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) +int ucp_tag_offload_post(ucp_request_t *req) { - size_t length = req->recv.length; - ucp_mem_desc_t *rdesc = NULL; - ucp_worker_t *worker = req->recv.worker; + size_t length = req->recv.length; + ucp_mem_desc_t *rdesc = NULL; + ucp_worker_t *worker = req->recv.worker; + ucp_context_t *context = worker->context; ucp_worker_iface_t *ucp_iface; ucs_status_t status; ucp_rsc_index_t mdi; @@ -256,8 +258,8 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) return 0; } - if ((ctx->config.tag_sender_mask & req->recv.tag.tag_mask) != - ctx->config.tag_sender_mask) { + if ((context->config.tag_sender_mask & req->recv.tag.tag_mask) != + context->config.tag_sender_mask) { /* Wildcard. * TODO add check that only offload capable iface present. In * this case can post tag as well. */ @@ -265,7 +267,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) return 0; } - if (ctx->tm.offload.sw_req_count) { + if (worker->tm.offload.sw_req_count) { /* There are some requests which must be completed in SW. Do not post * tags to HW until they are completed. */ UCP_WORKER_STAT_TAG_OFFLOAD(worker, BLOCK_SW_PEND); @@ -276,9 +278,9 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) * posted to the transport */ ucs_assert(req->recv.state.offset == 0); - ucp_iface = ucp_tag_offload_iface(ctx); + ucp_iface = ucp_tag_offload_iface(worker); - if (ucs_unlikely(length >= ctx->tm.offload.zcopy_thresh)) { + if (ucs_unlikely(length >= worker->tm.offload.zcopy_thresh)) { if (length > ucp_iface->attr.cap.tag.recv.max_zcopy) { /* Post maximum allowed length. If sender sends smaller message * (which is allowed per MPI standard), max recv should fit it. @@ -289,23 +291,23 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) length = ucp_iface->attr.cap.tag.recv.max_zcopy; } - status = ucp_request_memory_reg(ctx, ucp_iface->rsc_index, req->recv.buffer, - length, req->recv.datatype, - &req->recv.state); + status = ucp_request_memory_reg(context, ucp_iface->rsc_index, + req->recv.buffer, length, + req->recv.datatype, &req->recv.state); if (status != UCS_OK) { return 0; } - req->recv.rdesc = NULL; - iov.buffer = (void*)req->recv.buffer; - iov.memh = req->recv.state.dt.contig[0].memh; + req->recv.rdesc = NULL; + iov.buffer = (void*)req->recv.buffer; + iov.memh = req->recv.state.dt.contig[0].memh; } else { rdesc = ucp_worker_mpool_get(worker); if (rdesc == NULL) { return 0; } - mdi = ctx->tl_rscs[ucp_iface->rsc_index].md_index; + mdi = context->tl_rscs[ucp_iface->rsc_index].md_index; iov.memh = ucp_memh2uct(rdesc->memh, mdi); iov.buffer = rdesc + 1; req->recv.rdesc = rdesc; @@ -324,7 +326,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) &req->recv.uct_ctx); if (status != UCS_OK) { /* No more matching entries in the transport. */ - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); UCP_WORKER_STAT_TAG_OFFLOAD(worker, BLOCK_TAG_EXCEED); return 0; } @@ -583,8 +585,7 @@ static UCS_F_ALWAYS_INLINE void ucp_tag_offload_sync_posted(ucp_worker_t *worker, ucp_request_t *req) { req->send.tag_offload.ssend_tag = req->send.tag; - ucs_queue_push(&worker->context->tm.offload.sync_reqs, - &req->send.tag_offload.queue); + ucs_queue_push(&worker->tm.offload.sync_reqs, &req->send.tag_offload.queue); } static ucs_status_t ucp_tag_offload_eager_sync_bcopy(uct_pending_req_t *self) diff --git a/src/ucp/tag/offload.h b/src/ucp/tag/offload.h index 69ec2644777..57fc20d6b6d 100644 --- a/src/ucp/tag/offload.h +++ b/src/ucp/tag/offload.h @@ -60,28 +60,28 @@ ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, uint64_t stag uint64_t remote_addr, size_t length, const void *rkey_buf); -void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force); +void ucp_tag_offload_cancel(ucp_worker_t *worker, ucp_request_t *req, int force); -int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req); +int ucp_tag_offload_post(ucp_request_t *req); static UCS_F_ALWAYS_INLINE void -ucp_tag_offload_try_post(ucp_context_t *ctx, ucp_request_t *req) +ucp_tag_offload_try_post(ucp_worker_t *worker, ucp_request_t *req) { - if (ucs_unlikely((req->recv.length >= ctx->tm.offload.thresh) && + if (ucs_unlikely((req->recv.length >= worker->tm.offload.thresh) && (req->recv.state.offset == 0))) { - if (ucp_tag_offload_post(ctx, req)) { + if (ucp_tag_offload_post(req)) { return; } } req->flags |= UCP_REQUEST_FLAG_BLOCK_OFFLOAD; - ++ctx->tm.offload.sw_req_count; + ++worker->tm.offload.sw_req_count; } static UCS_F_ALWAYS_INLINE void -ucp_tag_offload_try_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) +ucp_tag_offload_try_cancel(ucp_worker_t *worker, ucp_request_t *req, int force) { if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) { - ucp_tag_offload_cancel(ctx, req, force); + ucp_tag_offload_cancel(worker, req, force); } } diff --git a/src/ucp/tag/probe.c b/src/ucp/tag/probe.c index 2dd4340e9ec..da6e09fe196 100644 --- a/src/ucp/tag/probe.c +++ b/src/ucp/tag/probe.c @@ -14,7 +14,7 @@ static UCS_F_ALWAYS_INLINE ucp_recv_desc_t* -ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask, +ucp_tag_probe_search(ucp_worker_h worker, ucp_tag_t tag, uint64_t tag_mask, ucp_tag_recv_info_t *info, int remove) { ucp_recv_desc_t *rdesc; @@ -22,7 +22,7 @@ ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask, ucp_tag_t recv_tag; unsigned flags; - ucs_list_for_each(rdesc, &context->tm.unexpected.all, + ucs_list_for_each(rdesc, &worker->tm.unexpected.all, tag_list[UCP_RDESC_ALL_LIST]) { hdr = (void*)(rdesc + 1); recv_tag = hdr->tag; @@ -60,16 +60,14 @@ ucp_tag_message_h ucp_tag_probe_nb(ucp_worker_h worker, ucp_tag_t tag, ucp_tag_t tag_mask, int remove, ucp_tag_recv_info_t *info) { - ucp_context_h context = worker->context; + ucp_context_h UCS_V_UNUSED context = worker->context; ucp_recv_desc_t *ret; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock); ucs_trace_req("probe_nb tag %"PRIx64"/%"PRIx64, tag, tag_mask); - ret = ucp_tag_probe_search(context, tag, tag_mask, info, remove); + ret = ucp_tag_probe_search(worker, tag, tag_mask, info, remove); - UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return ret; diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 4a4e6730886..5ffa2c7cf8b 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -562,34 +562,33 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, unsigned tl_flags) { - const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | - UCP_RECV_DESC_FLAG_LAST | - UCP_RECV_DESC_FLAG_RNDV; - ucp_worker_h worker = arg; - ucp_rndv_rts_hdr_t *rndv_rts_hdr = data; - ucp_context_h context = worker->context; + const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | + UCP_RECV_DESC_FLAG_LAST | + UCP_RECV_DESC_FLAG_RNDV; + ucp_worker_h worker = arg; + ucp_rndv_rts_hdr_t *rndv_rts_hdr = data; ucp_request_t *rreq; ucs_status_t status; - UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock); + UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - rreq = ucp_tag_exp_search(&context->tm, rndv_rts_hdr->super.tag, + rreq = ucp_tag_exp_search(&worker->tm, rndv_rts_hdr->super.tag, rndv_rts_hdr->size, recv_flags); if (rreq != NULL) { ucp_rndv_matched(worker, rreq, rndv_rts_hdr); /* Cancel req in transport if it was offloaded, because it arrived as unexpected */ - ucp_tag_offload_try_cancel(context, rreq, 1); + ucp_tag_offload_try_cancel(worker, rreq, 1); UCP_WORKER_STAT_RNDV(worker, EXP); status = UCS_OK; } else { - status = ucp_tag_unexp_recv(&context->tm, worker, data, length, tl_flags, + status = ucp_tag_unexp_recv(&worker->tm, worker, data, length, tl_flags, sizeof(*rndv_rts_hdr), recv_flags); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); + UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return status; } diff --git a/src/ucp/tag/tag_match.c b/src/ucp/tag/tag_match.c index f5c87ffa2a4..d12402e3046 100644 --- a/src/ucp/tag/tag_match.c +++ b/src/ucp/tag/tag_match.c @@ -57,13 +57,12 @@ int ucp_tag_unexp_is_empty(ucp_tag_match_t *tm) void ucp_tag_exp_remove(ucp_tag_match_t *tm, ucp_request_t *req) { ucs_queue_head_t *queue = ucp_tag_exp_get_req_queue(tm, req); - ucp_context_t *ctx = ucs_container_of(tm, ucp_context_t, tm); ucs_queue_iter_t iter; ucp_request_t *qreq; ucs_queue_for_each_safe(qreq, iter, queue, recv.queue) { if (qreq == req) { - ucp_tag_offload_try_cancel(ctx, req, 0); + ucp_tag_offload_try_cancel(req->recv.worker, req, 0); ucs_queue_del_iter(queue, iter); return; } diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 710607b8096..742def01a7a 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -29,7 +29,6 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_tag_recv_callback_t cb, ucp_recv_desc_t *first_rdesc, unsigned *save_rreq) { - ucp_context_h context = worker->context; ucp_recv_desc_t *rdesc, *next; ucs_list_link_t *list; ucs_status_t status; @@ -38,26 +37,26 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, int i_list; /* fast check of global unexpected queue */ - if (ucs_list_is_empty(&context->tm.unexpected.all)) { + if (ucs_list_is_empty(&worker->tm.unexpected.all)) { return UCS_INPROGRESS; } if (first_rdesc == NULL) { if (tag_mask == UCP_TAG_MASK_FULL) { - list = ucp_tag_unexp_get_list_for_tag(&context->tm, tag); + list = ucp_tag_unexp_get_list_for_tag(&worker->tm, tag); if (ucs_list_is_empty(list)) { return UCS_INPROGRESS; } i_list = UCP_RDESC_HASH_LIST; } else { - list = &context->tm.unexpected.all; + list = &worker->tm.unexpected.all; i_list = UCP_RDESC_ALL_LIST; } rdesc = ucs_list_head(list, ucp_recv_desc_t, tag_list[i_list]); } else { ucs_assert(tag_mask == UCP_TAG_MASK_FULL); - list = ucp_tag_unexp_get_list_for_tag(&context->tm, tag); + list = ucp_tag_unexp_get_list_for_tag(&worker->tm, tag); i_list = UCP_RDESC_HASH_LIST; rdesc = first_rdesc; } @@ -144,7 +143,7 @@ ucp_tag_recv_request_completed(ucp_request_t *req, ucs_status_t status, req->status = status; if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) { - --req->recv.worker->context->tm.offload.sw_req_count; + --req->recv.worker->tm.offload.sw_req_count; } if ((req->flags |= UCP_REQUEST_FLAG_COMPLETED) & UCP_REQUEST_FLAG_RELEASED) { ucp_request_put(req); @@ -160,7 +159,6 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, { unsigned save_rreq = 1; ucs_queue_head_t *queue; - ucp_context_h context; ucs_status_t status; size_t buffer_size; @@ -183,8 +181,7 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, } else if (save_rreq) { /* If not found on unexpected, wait until it arrives. * If was found but need this receive request for later completion, save it */ - context = worker->context; - queue = ucp_tag_exp_get_queue(&context->tm, tag, tag_mask); + queue = ucp_tag_exp_get_queue(&worker->tm, tag, tag_mask); req->recv.buffer = buffer; req->recv.length = buffer_size; @@ -193,11 +190,11 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, req->recv.tag.tag_mask = tag_mask; req->recv.tag.cb = cb; - ucp_tag_exp_push(&context->tm, queue, req); + ucp_tag_exp_push(&worker->tm, queue, req); /* If offload supported, post this tag to transport as well. * TODO: need to distinguish the cases when posting is not needed. */ - ucp_tag_offload_try_post(worker->context, req); + ucp_tag_offload_try_post(worker, req); ucs_trace_req("%s returning expected request %p (%p)", debug_name, req, req + 1); } @@ -215,13 +212,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_recv_nbr, ucs_status_t status; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); status = ucp_tag_recv_common(worker, buffer, count, datatype, tag, tag_mask, req, UCP_REQUEST_DEBUG_FLAG_EXTERNAL, NULL, NULL, "recv_nbr"); - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return status; } @@ -236,7 +231,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_recv_nb, ucp_request_t *req; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); req = ucp_request_get(worker); if (ucs_likely(req != NULL)) { @@ -247,7 +241,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_recv_nb, ret = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return ret; } @@ -263,7 +256,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_msg_recv_nb, ucp_request_t *req; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); req = ucp_request_get(worker); if (ucs_likely(req != NULL)) { @@ -276,7 +268,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_msg_recv_nb, ret = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return ret; } diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 3ec9b8a0195..b811ff41d34 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -549,7 +549,7 @@ static int ucp_wireup_tag_lane_supported(ucp_ep_h ep, { return ((ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) && (ep->worker->context->config.ext.tm_offload == tm_config_mode) && - !ucs_queue_is_empty(&ep->worker->context->tm.offload.ifaces) && + !ucs_queue_is_empty(&ep->worker->tm.offload.ifaces) && /* TODO: remove check below when UCP_ERR_HANDLING_MODE_PEER supports * RNDV-protocol or HW TM supports fragmented protocols */ diff --git a/test/gtest/ucp/test_ucp_tag.cc b/test/gtest/ucp/test_ucp_tag.cc index dfeeee6d731..4b8f4d67209 100644 --- a/test/gtest/ucp/test_ucp_tag.cc +++ b/test/gtest/ucp/test_ucp_tag.cc @@ -9,7 +9,7 @@ #include extern "C" { -#include +#include } @@ -136,14 +136,14 @@ void test_ucp_tag::wait_and_validate(request *req) request_release(req); } -void test_ucp_tag::wait_for_unexpected_msg(ucp_context_h context, double sec) +void test_ucp_tag::wait_for_unexpected_msg(ucp_worker_h worker, double sec) { /* Wait for some message to be added to unexpected queue */ ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(sec); do { short_progress_loop(); - } while (ucp_tag_unexp_is_empty(&context->tm) && (ucs_get_time() < timeout)); + } while (ucp_tag_unexp_is_empty(&worker->tm) && (ucs_get_time() < timeout)); } test_ucp_tag::request * diff --git a/test/gtest/ucp/test_ucp_tag.h b/test/gtest/ucp/test_ucp_tag.h index e83d6ba00e4..64288d83988 100644 --- a/test/gtest/ucp/test_ucp_tag.h +++ b/test/gtest/ucp/test_ucp_tag.h @@ -85,7 +85,7 @@ class test_ucp_tag : public ucp_test { void wait_and_validate(request *req); - void wait_for_unexpected_msg(ucp_context_h context, double sec); + void wait_for_unexpected_msg(ucp_worker_h worker, double sec); static void* dt_common_start(size_t count); diff --git a/test/gtest/ucp/test_ucp_tag_probe.cc b/test/gtest/ucp/test_ucp_tag_probe.cc index 1441247f8b9..28700c3071c 100644 --- a/test/gtest/ucp/test_ucp_tag_probe.cc +++ b/test/gtest/ucp/test_ucp_tag_probe.cc @@ -147,7 +147,7 @@ UCS_TEST_P(test_ucp_tag_probe, send_rndv_msg_probe, "RNDV_THRESH=1048576") { ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req)); /* receiver - get the RTS and put it into unexpected */ - wait_for_unexpected_msg(receiver().ucph(), 10.0); + wait_for_unexpected_msg(receiver().worker(), 10.0); /* receiver - match the rts, remove it from unexpected and return it */ message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info); diff --git a/test/gtest/ucp/test_ucp_tag_xfer.cc b/test/gtest/ucp/test_ucp_tag_xfer.cc index 10d411e256d..6d086b70ff6 100644 --- a/test/gtest/ucp/test_ucp_tag_xfer.cc +++ b/test/gtest/ucp/test_ucp_tag_xfer.cc @@ -445,7 +445,7 @@ size_t test_ucp_tag_xfer::do_xfer(const void *sendbuf, void *recvbuf, } else { sreq = do_send(sendbuf, count, send_dt, sync); - wait_for_unexpected_msg(receiver().ucph(), 10.0); + wait_for_unexpected_msg(receiver().worker(), 10.0); if (sync) { EXPECT_FALSE(sreq->completed); From 249ffc57420451b34e71112bbf975706b7df1eb7 Mon Sep 17 00:00:00 2001 From: Mikhail Brinskii Date: Tue, 5 Dec 2017 09:31:31 +0200 Subject: [PATCH 2/2] UCP/TAG: Remove unnecessary locks in tag AM callbacks No need to get mutex in tag AM callbacks, since they can be called from progress context only and ucp_worker_progress is already guarded with locks. --- src/ucp/tag/eager_rcv.c | 3 --- src/ucp/tag/rndv.c | 3 --- 2 files changed, 6 deletions(-) diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index 572579e285c..0a6d1af9197 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -60,8 +60,6 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, size_t recv_len; ucp_tag_t recv_tag; - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - ucs_assert(length >= hdr_len); recv_tag = eager_hdr->super.tag; recv_len = length - hdr_len; @@ -114,7 +112,6 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, hdr_len, flags); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return status; } diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 5ffa2c7cf8b..f0b111bac11 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -570,8 +570,6 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, ucp_request_t *rreq; ucs_status_t status; - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - rreq = ucp_tag_exp_search(&worker->tm, rndv_rts_hdr->super.tag, rndv_rts_hdr->size, recv_flags); if (rreq != NULL) { @@ -588,7 +586,6 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, sizeof(*rndv_rts_hdr), recv_flags); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return status; }