diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 93544114ed6..71b6a36b780 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -938,20 +938,12 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver goto err_free_resources; } - /* initialize tag matching */ - status = ucp_tag_match_init(&context->tm); - if (status != UCS_OK) { - goto err_rkey_mp_cleanup; - } - ucs_debug("created ucp context %p [%d mds %d tls] features 0x%lx", context, context->num_mds, context->num_tls, context->config.features); *context_p = context; return UCS_OK; -err_rkey_mp_cleanup: - ucs_mpool_cleanup(&context->rkey_mp, 1); err_free_resources: ucp_free_resources(context); err_free_config: @@ -964,7 +956,6 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver void ucp_cleanup(ucp_context_h context) { - ucp_tag_match_cleanup(&context->tm); ucs_mpool_cleanup(&context->rkey_mp, 1); ucp_free_resources(context); ucp_free_config(context); @@ -1046,26 +1037,3 @@ void ucp_context_print_info(ucp_context_h context, FILE *stream) fprintf(stream, "#\n"); } -void ucp_context_tag_offload_enable(ucp_context_h context) -{ - ucp_worker_iface_t *offload_iface; - - /* Enable offload, if only one tag offload capable interface is present - * (multiple offload ifaces are not supported yet). */ - if (context->config.ext.tm_offload && - (ucs_queue_length(&context->tm.offload.ifaces) == 1)) { - context->tm.offload.thresh = context->config.ext.tm_thresh; - context->tm.offload.zcopy_thresh = context->config.ext.tm_max_bcopy; - - offload_iface = ucs_queue_head_elem_non_empty(&context->tm.offload.ifaces, - ucp_worker_iface_t, queue); - ucp_worker_iface_activate(offload_iface, 0); - - ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu", - context->tm.offload.thresh, context->tm.offload.zcopy_thresh); - } else { - context->tm.offload.thresh = SIZE_MAX; - ucs_debug("Disable TM offload, multiple offload ifaces are not supported"); - } -} - diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 30e166330a5..c5316588163 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -13,7 +13,6 @@ #include "ucp_thread.h" #include -#include #include #include #include @@ -129,8 +128,6 @@ typedef struct ucp_context { ucs_mpool_t rkey_mp; /* Pool for memory keys */ - ucp_tag_match_t tm; /* Tag-matching queues and offload info */ - struct { /* Bitmap of features supported by the context */ diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index 24b627df361..4c3bcdb4cf1 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -107,15 +107,13 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request), if (req->flags & UCP_REQUEST_FLAG_EXPECTED) { UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); - ucp_tag_exp_remove(&worker->context->tm, req); + ucp_tag_exp_remove(&worker->tm, req); /* If tag posted to the transport need to wait its completion */ if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) { ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); } } diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index afc9813bc44..f9e60fb4a22 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -91,7 +91,7 @@ ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status) ucs_status_string(status)); UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status); if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) { - --req->recv.worker->context->tm.offload.sw_req_count; + --req->recv.worker->tm.offload.sw_req_count; } ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info); } diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 2aee943d12d..15611558d6e 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -612,6 +612,30 @@ void ucp_worker_iface_event(int fd, void *arg) ucp_worker_signal_internal(worker); } +static void ucp_worker_tag_offload_enable(ucp_worker_t *worker) +{ + ucp_context_t *context = worker->context; + ucp_worker_iface_t *offload_iface; + + /* Enable offload, if only one tag offload capable interface is present + * (multiple offload ifaces are not supported yet). */ + if (context->config.ext.tm_offload && + (ucs_queue_length(&worker->tm.offload.ifaces) == 1)) { + worker->tm.offload.thresh = context->config.ext.tm_thresh; + worker->tm.offload.zcopy_thresh = context->config.ext.tm_max_bcopy; + + offload_iface = ucs_queue_head_elem_non_empty(&worker->tm.offload.ifaces, + ucp_worker_iface_t, queue); + ucp_worker_iface_activate(offload_iface, 0); + + ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu", + worker->tm.offload.thresh, worker->tm.offload.zcopy_thresh); + } else { + worker->tm.offload.thresh = SIZE_MAX; + ucs_debug("Disable TM offload, multiple offload ifaces are not supported"); + } +} + static void ucp_worker_close_ifaces(ucp_worker_h worker) { ucp_rsc_index_t rsc_index; @@ -643,8 +667,8 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker) } if (ucp_worker_is_tl_tag_offload(worker, rsc_index)) { - ucs_queue_remove(&worker->context->tm.offload.ifaces, &wiface->queue); - ucp_context_tag_offload_enable(worker->context); + ucs_queue_remove(&worker->tm.offload.ifaces, &wiface->queue); + ucp_worker_tag_offload_enable(worker); } uct_iface_close(wiface->iface); @@ -753,8 +777,8 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id, if (ucp_worker_is_tl_tag_offload(worker, tl_id)) { worker->ifaces[tl_id].rsc_index = tl_id; - ucs_queue_push(&context->tm.offload.ifaces, &wiface->queue); - ucp_context_tag_offload_enable(context); + ucs_queue_push(&worker->tm.offload.ifaces, &wiface->queue); + ucp_worker_tag_offload_enable(worker); } return UCS_OK; @@ -1123,6 +1147,13 @@ ucs_status_t ucp_worker_create(ucp_context_h context, cpu_mask = &empty_cpu_mask; } + /* Initialize tag matching */ + status = ucp_tag_match_init(&worker->tm); + if (status != UCS_OK) { + goto err_wakeup_cleanup; + } + + /* Open all resources as interfaces on this worker */ for (tl_id = 0; tl_id < context->num_tls; ++tl_id) { status = ucp_worker_add_iface(worker, tl_id, rx_headroom, cpu_mask); @@ -1145,6 +1176,8 @@ ucs_status_t ucp_worker_create(ucp_context_h context, err_close_ifaces: ucp_worker_close_ifaces(worker); + ucp_tag_match_cleanup(&worker->tm); +err_wakeup_cleanup: ucp_worker_wakeup_cleanup(worker); err_rndv_lanes_mp_cleanup: ucs_mpool_cleanup(&worker->rndv_get_mp, 1); @@ -1184,6 +1217,7 @@ void ucp_worker_destroy(ucp_worker_h worker) ucs_mpool_cleanup(&worker->reg_mp, 1); ucs_mpool_cleanup(&worker->rndv_get_mp, 1); ucp_worker_close_ifaces(worker); + ucp_tag_match_cleanup(&worker->tm); ucp_worker_wakeup_cleanup(worker); ucs_mpool_cleanup(&worker->req_mp, 1); uct_worker_destroy(worker->uct); diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index ac85cb304b3..c041155e861 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -11,6 +11,7 @@ #include "ucp_ep.h" #include "ucp_thread.h" +#include #include #include #include @@ -172,6 +173,7 @@ typedef struct ucp_worker { ucs_mpool_t am_mp; /* Memory pool for AM receives */ ucs_mpool_t reg_mp; /* Registered memory pool */ ucp_mt_lock_t mt_lock; /* Configuration of multi-threading support */ + ucp_tag_match_t tm; /* Tag-matching queues and offload info */ UCS_STATS_NODE_DECLARE(stats); UCS_STATS_NODE_DECLARE(tm_offload_stats); diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index 01fae57927f..0a6d1af9197 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -55,19 +55,16 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, ucp_worker_h worker = arg; ucp_eager_hdr_t *eager_hdr = data; ucp_eager_first_hdr_t *eager_first_hdr = data; - ucp_context_h context = worker->context; ucp_request_t *req; ucs_status_t status; size_t recv_len; ucp_tag_t recv_tag; - UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock); - ucs_assert(length >= hdr_len); recv_tag = eager_hdr->super.tag; recv_len = length - hdr_len; - req = ucp_tag_exp_search(&context->tm, recv_tag, recv_len, flags); + req = ucp_tag_exp_search(&worker->tm, recv_tag, recv_len, flags); if (req != NULL) { UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len); @@ -85,7 +82,7 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, * because it arrived either: * 1) via SW TM (e. g. peer doesn't support offload) * 2) as unexpected via HW TM */ - ucp_tag_offload_try_cancel(context, req, 1); + ucp_tag_offload_try_cancel(worker, req, 1); if (flags & UCP_RECV_DESC_FLAG_LAST) { req->recv.tag.info.length = recv_len; @@ -111,11 +108,10 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, status = UCS_OK; } else { - status = ucp_tag_unexp_recv(&context->tm, worker, data, length, am_flags, + status = ucp_tag_unexp_recv(&worker->tm, worker, data, length, am_flags, hdr_len, flags); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); return status; } @@ -182,7 +178,7 @@ static ucs_status_t ucp_eager_offload_sync_ack_handler(void *arg, void *data, { ucp_offload_ssend_hdr_t *rep_hdr = data; ucp_worker_t *worker = arg; - ucs_queue_head_t *queue = &worker->context->tm.offload.sync_reqs; + ucs_queue_head_t *queue = &worker->tm.offload.sync_reqs; ucp_request_t *sreq; ucs_queue_iter_t iter; diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 4052ef3c460..a47a4e52aad 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -18,25 +18,25 @@ static UCS_F_ALWAYS_INLINE ucp_worker_iface_t* -ucp_tag_offload_iface(ucp_context_t *ctx) +ucp_tag_offload_iface(ucp_worker_t *worker) { - ucs_assert(!ucs_queue_is_empty(&ctx->tm.offload.ifaces)); + ucs_assert(!ucs_queue_is_empty(&worker->tm.offload.ifaces)); - return ucs_queue_head_elem_non_empty(&ctx->tm.offload.ifaces, + return ucs_queue_head_elem_non_empty(&worker->tm.offload.ifaces, ucp_worker_iface_t, queue); } static UCS_F_ALWAYS_INLINE void -ucp_tag_offload_release_buf(ucp_request_t *req, ucp_context_t *ctx) +ucp_tag_offload_release_buf(ucp_request_t *req, ucp_worker_t *worker) { ucp_worker_iface_t *iface; if (req->recv.rdesc != NULL) { ucs_mpool_put_inline(req->recv.rdesc); } else { - iface = ucp_tag_offload_iface(ctx); - ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype, - &req->recv.state); + iface = ucp_tag_offload_iface(worker); + ucp_request_memory_dereg(worker->context, iface->rsc_index, + req->recv.datatype, &req->recv.state); } } @@ -44,10 +44,9 @@ ucp_tag_offload_release_buf(ucp_request_t *req, ucp_context_t *ctx) void ucp_tag_offload_tag_consumed(uct_tag_context_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); - ucp_context_t *ctx = req->recv.worker->context; ucs_queue_head_t *queue; - queue = ucp_tag_exp_get_req_queue(&ctx->tm, req); + queue = ucp_tag_exp_get_req_queue(&req->recv.worker->tm, req); ucs_queue_remove(queue, &req->recv.queue); } @@ -55,15 +54,15 @@ void ucp_tag_offload_tag_consumed(uct_tag_context_t *self) void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, uint64_t imm, size_t length, ucs_status_t status) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); - ucp_context_t *ctx = req->recv.worker->context; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); + ucp_worker_t *worker = req->recv.worker; ucp_worker_iface_t *iface; req->recv.tag.info.sender_tag = stag; req->recv.tag.info.length = length; if (ucs_unlikely(status != UCS_OK)) { - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); goto out; } @@ -79,9 +78,9 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, UCP_RECV_DESC_FLAG_LAST); ucs_mpool_put_inline(req->recv.rdesc); } else { - iface = ucp_tag_offload_iface(ctx); - ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype, - &req->recv.state); + iface = ucp_tag_offload_iface(worker); + ucp_request_memory_dereg(worker->context, iface->rsc_index, + req->recv.datatype, &req->recv.state); } UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, MATCHED); @@ -90,13 +89,14 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, } static UCS_F_ALWAYS_INLINE size_t -ucp_tag_offload_rkey_size(ucp_context_t *ctx) +ucp_tag_offload_rkey_size(ucp_worker_t *worker) { - ucp_worker_iface_t *iface = ucp_tag_offload_iface(ctx); + ucp_worker_iface_t *iface = ucp_tag_offload_iface(worker); + ucp_context_t *context = worker->context; ucp_md_index_t md_idx; - md_idx = ctx->tl_rscs[iface->rsc_index].md_index; - return ctx->tl_mds[md_idx].attr.rkey_packed_size; + md_idx = context->tl_rscs[iface->rsc_index].md_index; + return context->tl_mds[md_idx].attr.rkey_packed_size; } static UCS_F_ALWAYS_INLINE size_t @@ -130,19 +130,19 @@ void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag, const void *header, unsigned header_length, ucs_status_t status) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); - ucp_context_t *ctx = req->recv.worker->context; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); + ucp_worker_t *worker = req->recv.worker; - UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, MATCHED_SW_RNDV); + UCP_WORKER_STAT_TAG_OFFLOAD(worker, MATCHED_SW_RNDV); if (ucs_unlikely(status != UCS_OK)) { - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); ucp_request_complete_tag_recv(req, status); return; } ucp_rndv_matched(req->recv.worker, req, (ucp_rndv_rts_hdr_t*)header); - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); } UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, @@ -151,15 +151,16 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, unsigned hdr_length, uint64_t remote_addr, size_t length, const void *rkey_buf) { - ucp_worker_t *worker = arg; - ucp_request_hdr_t *rndv_hdr = (ucp_request_hdr_t*)hdr; + ucp_worker_t *worker = arg; + ucp_request_hdr_t *rndv_hdr; ucp_rndv_rts_hdr_t *rts; size_t len; size_t rkey_size; if (remote_addr) { /* Unexpected tag offload RNDV */ - rkey_size = ucp_tag_offload_rkey_size(worker->context); + rndv_hdr = (ucp_request_hdr_t*)hdr; + rkey_size = ucp_tag_offload_rkey_size(worker); rts = ucs_alloca(sizeof(*rts) + rkey_size); /* SW rndv req may also carry a key */ ucp_tag_offload_fill_rts(rts, rndv_hdr, stag, remote_addr, length, 0); @@ -182,9 +183,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, return UCS_OK; } -void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) +void ucp_tag_offload_cancel(ucp_worker_t *worker, ucp_request_t *req, int force) { - ucp_worker_iface_t *ucp_iface = ucp_tag_offload_iface(ctx); + + ucp_worker_iface_t *ucp_iface = ucp_tag_offload_iface(worker); ucs_status_t status; status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, @@ -194,19 +196,20 @@ void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) ucs_status_string(status)); return; } - UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, CANCELED); + UCP_WORKER_STAT_TAG_OFFLOAD(worker, CANCELED); /* if cancel is not forced, need to wait its completion */ if (force) { - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); } } -int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) +int ucp_tag_offload_post(ucp_request_t *req) { - size_t length = req->recv.length; - ucp_mem_desc_t *rdesc = NULL; - ucp_worker_t *worker = req->recv.worker; + size_t length = req->recv.length; + ucp_mem_desc_t *rdesc = NULL; + ucp_worker_t *worker = req->recv.worker; + ucp_context_t *context = worker->context; ucp_worker_iface_t *ucp_iface; ucs_status_t status; ucp_rsc_index_t mdi; @@ -218,8 +221,8 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) return 0; } - if ((ctx->config.tag_sender_mask & req->recv.tag.tag_mask) != - ctx->config.tag_sender_mask) { + if ((context->config.tag_sender_mask & req->recv.tag.tag_mask) != + context->config.tag_sender_mask) { /* Wildcard. * TODO add check that only offload capable iface present. In * this case can post tag as well. */ @@ -227,7 +230,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) return 0; } - if (ctx->tm.offload.sw_req_count) { + if (worker->tm.offload.sw_req_count) { /* There are some requests which must be completed in SW. Do not post * tags to HW until they are completed. */ UCP_WORKER_STAT_TAG_OFFLOAD(worker, BLOCK_SW_PEND); @@ -238,9 +241,9 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) * posted to the transport */ ucs_assert(req->recv.state.offset == 0); - ucp_iface = ucp_tag_offload_iface(ctx); + ucp_iface = ucp_tag_offload_iface(worker); - if (ucs_unlikely(length >= ctx->tm.offload.zcopy_thresh)) { + if (ucs_unlikely(length >= worker->tm.offload.zcopy_thresh)) { if (length > ucp_iface->attr.cap.tag.recv.max_zcopy) { /* Post maximum allowed length. If sender sends smaller message * (which is allowed per MPI standard), max recv should fit it. @@ -251,23 +254,23 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) length = ucp_iface->attr.cap.tag.recv.max_zcopy; } - status = ucp_request_memory_reg(ctx, ucp_iface->rsc_index, req->recv.buffer, - length, req->recv.datatype, - &req->recv.state); + status = ucp_request_memory_reg(context, ucp_iface->rsc_index, + req->recv.buffer, length, + req->recv.datatype, &req->recv.state); if (status != UCS_OK) { return 0; } - req->recv.rdesc = NULL; - iov.buffer = (void*)req->recv.buffer; - iov.memh = req->recv.state.dt.contig[0].memh; + req->recv.rdesc = NULL; + iov.buffer = (void*)req->recv.buffer; + iov.memh = req->recv.state.dt.contig[0].memh; } else { rdesc = ucp_worker_mpool_get(worker); if (rdesc == NULL) { return 0; } - mdi = ctx->tl_rscs[ucp_iface->rsc_index].md_index; + mdi = context->tl_rscs[ucp_iface->rsc_index].md_index; iov.memh = ucp_memh2uct(rdesc->memh, mdi); iov.buffer = rdesc + 1; req->recv.rdesc = rdesc; @@ -286,7 +289,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) &req->recv.uct_ctx); if (status != UCS_OK) { /* No more matching entries in the transport. */ - ucp_tag_offload_release_buf(req, ctx); + ucp_tag_offload_release_buf(req, worker); UCP_WORKER_STAT_TAG_OFFLOAD(worker, BLOCK_TAG_EXCEED); return 0; } @@ -510,8 +513,7 @@ static UCS_F_ALWAYS_INLINE void ucp_tag_offload_sync_posted(ucp_worker_t *worker, ucp_request_t *req) { req->send.tag_offload.ssend_tag = req->send.tag; - ucs_queue_push(&worker->context->tm.offload.sync_reqs, - &req->send.tag_offload.queue); + ucs_queue_push(&worker->tm.offload.sync_reqs, &req->send.tag_offload.queue); } static ucs_status_t ucp_tag_offload_eager_sync_bcopy(uct_pending_req_t *self) diff --git a/src/ucp/tag/offload.h b/src/ucp/tag/offload.h index 69ec2644777..57fc20d6b6d 100644 --- a/src/ucp/tag/offload.h +++ b/src/ucp/tag/offload.h @@ -60,28 +60,28 @@ ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, uint64_t stag uint64_t remote_addr, size_t length, const void *rkey_buf); -void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force); +void ucp_tag_offload_cancel(ucp_worker_t *worker, ucp_request_t *req, int force); -int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req); +int ucp_tag_offload_post(ucp_request_t *req); static UCS_F_ALWAYS_INLINE void -ucp_tag_offload_try_post(ucp_context_t *ctx, ucp_request_t *req) +ucp_tag_offload_try_post(ucp_worker_t *worker, ucp_request_t *req) { - if (ucs_unlikely((req->recv.length >= ctx->tm.offload.thresh) && + if (ucs_unlikely((req->recv.length >= worker->tm.offload.thresh) && (req->recv.state.offset == 0))) { - if (ucp_tag_offload_post(ctx, req)) { + if (ucp_tag_offload_post(req)) { return; } } req->flags |= UCP_REQUEST_FLAG_BLOCK_OFFLOAD; - ++ctx->tm.offload.sw_req_count; + ++worker->tm.offload.sw_req_count; } static UCS_F_ALWAYS_INLINE void -ucp_tag_offload_try_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) +ucp_tag_offload_try_cancel(ucp_worker_t *worker, ucp_request_t *req, int force) { if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) { - ucp_tag_offload_cancel(ctx, req, force); + ucp_tag_offload_cancel(worker, req, force); } } diff --git a/src/ucp/tag/probe.c b/src/ucp/tag/probe.c index 2dd4340e9ec..da6e09fe196 100644 --- a/src/ucp/tag/probe.c +++ b/src/ucp/tag/probe.c @@ -14,7 +14,7 @@ static UCS_F_ALWAYS_INLINE ucp_recv_desc_t* -ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask, +ucp_tag_probe_search(ucp_worker_h worker, ucp_tag_t tag, uint64_t tag_mask, ucp_tag_recv_info_t *info, int remove) { ucp_recv_desc_t *rdesc; @@ -22,7 +22,7 @@ ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask, ucp_tag_t recv_tag; unsigned flags; - ucs_list_for_each(rdesc, &context->tm.unexpected.all, + ucs_list_for_each(rdesc, &worker->tm.unexpected.all, tag_list[UCP_RDESC_ALL_LIST]) { hdr = (void*)(rdesc + 1); recv_tag = hdr->tag; @@ -60,16 +60,14 @@ ucp_tag_message_h ucp_tag_probe_nb(ucp_worker_h worker, ucp_tag_t tag, ucp_tag_t tag_mask, int remove, ucp_tag_recv_info_t *info) { - ucp_context_h context = worker->context; + ucp_context_h UCS_V_UNUSED context = worker->context; ucp_recv_desc_t *ret; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock); ucs_trace_req("probe_nb tag %"PRIx64"/%"PRIx64, tag, tag_mask); - ret = ucp_tag_probe_search(context, tag, tag_mask, info, remove); + ret = ucp_tag_probe_search(worker, tag, tag_mask, info, remove); - UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return ret; diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 1d36028f41a..1b71838ac81 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -562,34 +562,30 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, unsigned tl_flags) { - const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | - UCP_RECV_DESC_FLAG_LAST | - UCP_RECV_DESC_FLAG_RNDV; - ucp_worker_h worker = arg; - ucp_rndv_rts_hdr_t *rndv_rts_hdr = data; - ucp_context_h context = worker->context; + const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | + UCP_RECV_DESC_FLAG_LAST | + UCP_RECV_DESC_FLAG_RNDV; + ucp_worker_h worker = arg; + ucp_rndv_rts_hdr_t *rndv_rts_hdr = data; ucp_request_t *rreq; ucs_status_t status; - UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock); - - rreq = ucp_tag_exp_search(&context->tm, rndv_rts_hdr->super.tag, + rreq = ucp_tag_exp_search(&worker->tm, rndv_rts_hdr->super.tag, rndv_rts_hdr->size, recv_flags); if (rreq != NULL) { ucp_rndv_matched(worker, rreq, rndv_rts_hdr); /* Cancel req in transport if it was offloaded, because it arrived as unexpected */ - ucp_tag_offload_try_cancel(context, rreq, 1); + ucp_tag_offload_try_cancel(worker, rreq, 1); UCP_WORKER_STAT_RNDV(worker, EXP); status = UCS_OK; } else { - status = ucp_tag_unexp_recv(&context->tm, worker, data, length, tl_flags, + status = ucp_tag_unexp_recv(&worker->tm, worker, data, length, tl_flags, sizeof(*rndv_rts_hdr), recv_flags); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); return status; } diff --git a/src/ucp/tag/tag_match.c b/src/ucp/tag/tag_match.c index f5c87ffa2a4..d12402e3046 100644 --- a/src/ucp/tag/tag_match.c +++ b/src/ucp/tag/tag_match.c @@ -57,13 +57,12 @@ int ucp_tag_unexp_is_empty(ucp_tag_match_t *tm) void ucp_tag_exp_remove(ucp_tag_match_t *tm, ucp_request_t *req) { ucs_queue_head_t *queue = ucp_tag_exp_get_req_queue(tm, req); - ucp_context_t *ctx = ucs_container_of(tm, ucp_context_t, tm); ucs_queue_iter_t iter; ucp_request_t *qreq; ucs_queue_for_each_safe(qreq, iter, queue, recv.queue) { if (qreq == req) { - ucp_tag_offload_try_cancel(ctx, req, 0); + ucp_tag_offload_try_cancel(req->recv.worker, req, 0); ucs_queue_del_iter(queue, iter); return; } diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 710607b8096..742def01a7a 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -29,7 +29,6 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_tag_recv_callback_t cb, ucp_recv_desc_t *first_rdesc, unsigned *save_rreq) { - ucp_context_h context = worker->context; ucp_recv_desc_t *rdesc, *next; ucs_list_link_t *list; ucs_status_t status; @@ -38,26 +37,26 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, int i_list; /* fast check of global unexpected queue */ - if (ucs_list_is_empty(&context->tm.unexpected.all)) { + if (ucs_list_is_empty(&worker->tm.unexpected.all)) { return UCS_INPROGRESS; } if (first_rdesc == NULL) { if (tag_mask == UCP_TAG_MASK_FULL) { - list = ucp_tag_unexp_get_list_for_tag(&context->tm, tag); + list = ucp_tag_unexp_get_list_for_tag(&worker->tm, tag); if (ucs_list_is_empty(list)) { return UCS_INPROGRESS; } i_list = UCP_RDESC_HASH_LIST; } else { - list = &context->tm.unexpected.all; + list = &worker->tm.unexpected.all; i_list = UCP_RDESC_ALL_LIST; } rdesc = ucs_list_head(list, ucp_recv_desc_t, tag_list[i_list]); } else { ucs_assert(tag_mask == UCP_TAG_MASK_FULL); - list = ucp_tag_unexp_get_list_for_tag(&context->tm, tag); + list = ucp_tag_unexp_get_list_for_tag(&worker->tm, tag); i_list = UCP_RDESC_HASH_LIST; rdesc = first_rdesc; } @@ -144,7 +143,7 @@ ucp_tag_recv_request_completed(ucp_request_t *req, ucs_status_t status, req->status = status; if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) { - --req->recv.worker->context->tm.offload.sw_req_count; + --req->recv.worker->tm.offload.sw_req_count; } if ((req->flags |= UCP_REQUEST_FLAG_COMPLETED) & UCP_REQUEST_FLAG_RELEASED) { ucp_request_put(req); @@ -160,7 +159,6 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, { unsigned save_rreq = 1; ucs_queue_head_t *queue; - ucp_context_h context; ucs_status_t status; size_t buffer_size; @@ -183,8 +181,7 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, } else if (save_rreq) { /* If not found on unexpected, wait until it arrives. * If was found but need this receive request for later completion, save it */ - context = worker->context; - queue = ucp_tag_exp_get_queue(&context->tm, tag, tag_mask); + queue = ucp_tag_exp_get_queue(&worker->tm, tag, tag_mask); req->recv.buffer = buffer; req->recv.length = buffer_size; @@ -193,11 +190,11 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, req->recv.tag.tag_mask = tag_mask; req->recv.tag.cb = cb; - ucp_tag_exp_push(&context->tm, queue, req); + ucp_tag_exp_push(&worker->tm, queue, req); /* If offload supported, post this tag to transport as well. * TODO: need to distinguish the cases when posting is not needed. */ - ucp_tag_offload_try_post(worker->context, req); + ucp_tag_offload_try_post(worker, req); ucs_trace_req("%s returning expected request %p (%p)", debug_name, req, req + 1); } @@ -215,13 +212,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_recv_nbr, ucs_status_t status; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); status = ucp_tag_recv_common(worker, buffer, count, datatype, tag, tag_mask, req, UCP_REQUEST_DEBUG_FLAG_EXTERNAL, NULL, NULL, "recv_nbr"); - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return status; } @@ -236,7 +231,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_recv_nb, ucp_request_t *req; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); req = ucp_request_get(worker); if (ucs_likely(req != NULL)) { @@ -247,7 +241,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_recv_nb, ret = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return ret; } @@ -263,7 +256,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_msg_recv_nb, ucp_request_t *req; UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock); req = ucp_request_get(worker); if (ucs_likely(req != NULL)) { @@ -276,7 +268,6 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_msg_recv_nb, ret = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); } - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); return ret; } diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 3ec9b8a0195..b811ff41d34 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -549,7 +549,7 @@ static int ucp_wireup_tag_lane_supported(ucp_ep_h ep, { return ((ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) && (ep->worker->context->config.ext.tm_offload == tm_config_mode) && - !ucs_queue_is_empty(&ep->worker->context->tm.offload.ifaces) && + !ucs_queue_is_empty(&ep->worker->tm.offload.ifaces) && /* TODO: remove check below when UCP_ERR_HANDLING_MODE_PEER supports * RNDV-protocol or HW TM supports fragmented protocols */ diff --git a/test/gtest/ucp/test_ucp_tag.cc b/test/gtest/ucp/test_ucp_tag.cc index dfeeee6d731..4b8f4d67209 100644 --- a/test/gtest/ucp/test_ucp_tag.cc +++ b/test/gtest/ucp/test_ucp_tag.cc @@ -9,7 +9,7 @@ #include extern "C" { -#include +#include } @@ -136,14 +136,14 @@ void test_ucp_tag::wait_and_validate(request *req) request_release(req); } -void test_ucp_tag::wait_for_unexpected_msg(ucp_context_h context, double sec) +void test_ucp_tag::wait_for_unexpected_msg(ucp_worker_h worker, double sec) { /* Wait for some message to be added to unexpected queue */ ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(sec); do { short_progress_loop(); - } while (ucp_tag_unexp_is_empty(&context->tm) && (ucs_get_time() < timeout)); + } while (ucp_tag_unexp_is_empty(&worker->tm) && (ucs_get_time() < timeout)); } test_ucp_tag::request * diff --git a/test/gtest/ucp/test_ucp_tag.h b/test/gtest/ucp/test_ucp_tag.h index e83d6ba00e4..64288d83988 100644 --- a/test/gtest/ucp/test_ucp_tag.h +++ b/test/gtest/ucp/test_ucp_tag.h @@ -85,7 +85,7 @@ class test_ucp_tag : public ucp_test { void wait_and_validate(request *req); - void wait_for_unexpected_msg(ucp_context_h context, double sec); + void wait_for_unexpected_msg(ucp_worker_h worker, double sec); static void* dt_common_start(size_t count); diff --git a/test/gtest/ucp/test_ucp_tag_probe.cc b/test/gtest/ucp/test_ucp_tag_probe.cc index 1441247f8b9..28700c3071c 100644 --- a/test/gtest/ucp/test_ucp_tag_probe.cc +++ b/test/gtest/ucp/test_ucp_tag_probe.cc @@ -147,7 +147,7 @@ UCS_TEST_P(test_ucp_tag_probe, send_rndv_msg_probe, "RNDV_THRESH=1048576") { ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req)); /* receiver - get the RTS and put it into unexpected */ - wait_for_unexpected_msg(receiver().ucph(), 10.0); + wait_for_unexpected_msg(receiver().worker(), 10.0); /* receiver - match the rts, remove it from unexpected and return it */ message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info); diff --git a/test/gtest/ucp/test_ucp_tag_xfer.cc b/test/gtest/ucp/test_ucp_tag_xfer.cc index 10d411e256d..6d086b70ff6 100644 --- a/test/gtest/ucp/test_ucp_tag_xfer.cc +++ b/test/gtest/ucp/test_ucp_tag_xfer.cc @@ -445,7 +445,7 @@ size_t test_ucp_tag_xfer::do_xfer(const void *sendbuf, void *recvbuf, } else { sreq = do_send(sendbuf, count, send_dt, sync); - wait_for_unexpected_msg(receiver().ucph(), 10.0); + wait_for_unexpected_msg(receiver().worker(), 10.0); if (sync) { EXPECT_FALSE(sreq->completed);