From 5fb526906a2bfc695bf4deea85006bde05b73a59 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Sun, 28 May 2017 14:23:51 +0300 Subject: [PATCH 1/5] UCP: tm offload rndv support --- src/ucp/core/ucp_request.h | 5 + src/ucp/core/ucp_worker.c | 3 +- src/ucp/core/ucp_worker.h | 4 +- src/ucp/tag/eager_snd.c | 2 +- src/ucp/tag/offload.c | 134 +++++++++++++++++++++++++++ src/ucp/tag/offload.h | 18 ++++ src/ucp/tag/rndv.c | 63 ++++++++++--- src/ucp/tag/rndv.h | 10 +- src/ucp/tag/tag_send.c | 7 +- src/ucp/wireup/stub_ep.c | 1 + test/gtest/ucp/test_ucp_tag_probe.cc | 4 +- 11 files changed, 230 insertions(+), 21 deletions(-) diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 695e5c04acc..a17a0d88512 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -127,6 +127,11 @@ struct ucp_request { uint64_t value; void *result; } amo; + struct { + void *rndv_op; /* Handler of issued rndv send. Needs to cancel the + operation if it is completed by SW */ + } tag_offload; + }; ucp_lane_index_t lane; /* Lane on which this request is being sent */ diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 4a2f12e4e5a..8f9fd8c3463 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -255,8 +255,9 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker, iface_params.stats_root = UCS_STATS_RVAL(worker->stats); iface_params.rx_headroom = rx_headroom; iface_params.cpu_mask = *cpu_mask_param; - iface_params.eager_arg = worker; + iface_params.eager_arg = iface_params.rndv_arg = worker; iface_params.eager_cb = ucp_tag_offload_unexp_eager; + iface_params.rndv_cb = ucp_tag_offload_unexp_rndv; /* Open UCT interface */ status = uct_iface_open(context->tl_mds[resource->md_index].md, worker->uct, diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 971ebadd48e..fc811958305 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -145,7 +145,9 @@ static inline ucp_ep_h ucp_worker_ep_find(ucp_worker_h worker, uint64_t dest_uui static UCS_F_ALWAYS_INLINE uint64_t ucp_worker_is_tl_tag_offload(ucp_worker_h worker, ucp_rsc_index_t rsc_index) { - return 0; /* Stub for now, offload TM RNDV is not implemented yet */ + return (worker->ifaces[rsc_index].attr.cap.flags & + (UCT_IFACE_FLAG_TAG_EAGER_SHORT | UCT_IFACE_FLAG_TAG_EAGER_BCOPY | + UCT_IFACE_FLAG_TAG_EAGER_ZCOPY | UCT_IFACE_FLAG_TAG_RNDV_ZCOPY)); } #endif diff --git a/src/ucp/tag/eager_snd.c b/src/ucp/tag/eager_snd.c index 0214cbf4b9a..c2174f3380f 100644 --- a/src/ucp/tag/eager_snd.c +++ b/src/ucp/tag/eager_snd.c @@ -199,7 +199,7 @@ static ucs_status_t ucp_tag_eager_zcopy_multi(uct_pending_req_t *self) } void ucp_tag_eager_zcopy_completion(uct_completion_t *self, - ucs_status_t status) + ucs_status_t status) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp); ucp_tag_eager_zcopy_req_complete(req); diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 22eb37797ec..ded24bd340c 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -44,6 +44,64 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, ucp_request_complete_recv(req, status); } +/* RNDV request matched by the transport. Need to proceed with AM based RNDV */ +void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag, + const void *header, unsigned header_length, + ucs_status_t status) +{ + ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx); + ucp_context_t *ctx = req->recv.worker->context; + ucp_sw_rndv_hdr_t *sreq = (ucp_sw_rndv_hdr_t*)header; + ucp_worker_iface_t *iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, + ucp_worker_iface_t, queue); + ucp_rndv_rts_hdr_t rts; + + rts.sreq = sreq->super; + rts.super.tag = stag; + rts.address = 0; /* RNDV needs to be completed in SW */ + rts.size = sreq->length; + + ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype, + &req->recv.state); + ucp_rndv_matched(req->recv.worker, req, &rts); +} + +ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, + uint64_t stag, const void *hdr, + unsigned hdr_length, + uint64_t remote_addr, size_t length, + const void *rkey_buf) +{ + ucp_rndv_rts_hdr_t *rts = (ucp_rndv_rts_hdr_t*)(((ucp_tag_hdr_t*)hdr) - 1); + ucp_worker_t *worker = arg; + void *rkey = rts + 1; + size_t len = sizeof(*rts); + ucp_ep_t *ep = ucp_worker_get_reply_ep(worker, rts->sreq.sender_uuid); + const uct_md_attr_t *md_attrs; + size_t rkey_size; + + /* rts.req should be alredy in place - it is sent by the sender. + * Fill the rest of rts header and pass to common rts handler */ + if (rkey_buf) { + /* Copy rkey before to fill rts, to avoid overriding rkey */ + md_attrs = ucp_ep_md_attr(ep, ucp_ep_get_tag_lane(ep)); + rkey_size = md_attrs->rkey_packed_size; + memcpy(rkey, rkey_buf, rkey_size); + len += rkey_size; + rts->flags = UCP_RNDV_RTS_FLAG_PACKED_RKEY | UCP_RNDV_RTS_FLAG_OFFLOAD; + rts->size = length; + } else { + ucs_assert(remote_addr == 0ul); + /* This must be SW RNDV request. Take length from its header. */ + rts->size = ((ucp_sw_rndv_hdr_t*)hdr)->length; + } + + rts->super.tag = stag; + rts->address = remote_addr; + + return ucp_rndv_rts_handler(arg, rts, len, flags, UCP_RECV_DESC_FLAG_OFFLOAD); +} + void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) { ucp_worker_iface_t *ucp_iface; @@ -99,6 +157,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req) req->recv.uct_ctx.tag_consumed_cb = ucp_tag_offload_tag_consumed; req->recv.uct_ctx.completed_cb = ucp_tag_offload_completed; + req->recv.uct_ctx.rndv_cb = ucp_tag_offload_rndv_cb; iov.buffer = (void*)req->recv.buffer; iov.length = length; @@ -213,6 +272,81 @@ static ucs_status_t ucp_tag_offload_eager_zcopy(uct_pending_req_t *self) return ucp_do_tag_offload_zcopy(self, 0ul, ucp_tag_eager_zcopy_req_complete); } +ucs_status_t ucp_tag_offload_sw_rndv(uct_pending_req_t *self) +{ + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); + ucp_ep_t *ep = req->send.ep; + ucp_sw_rndv_hdr_t rndv_hdr = { + .super.sender_uuid = req->send.ep->worker->uuid, + .super.reqptr = (uintptr_t)req, + .length = req->send.length + }; + + return uct_ep_tag_rndv_request(ep->uct_eps[req->send.lane], req->send.tag, + &rndv_hdr, sizeof(rndv_hdr)); +} + +ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self) +{ + void *rndv_op; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); + ucp_ep_t *ep = req->send.ep; + size_t max_iov = ucp_ep_config(ep)->tag.eager.max_iov; + uct_iov_t *iov = ucs_alloca(max_iov * sizeof(uct_iov_t)); + size_t iovcnt = 0; + ucp_request_hdr_t rndv_hdr = { + .sender_uuid = ep->worker->uuid, + .reqptr = (uintptr_t)req + }; + + req->send.uct_comp.count = 1; + req->send.uct_comp.func = ucp_tag_eager_zcopy_completion; + + ucs_assert_always(UCP_DT_IS_CONTIG(req->send.datatype)); + ucp_dt_iov_copy_uct(iov, &iovcnt, max_iov, &req->send.state, req->send.buffer, + req->send.datatype, req->send.length); + + rndv_op = uct_ep_tag_rndv_zcopy(ep->uct_eps[req->send.lane], req->send.tag, + &rndv_hdr, sizeof(rndv_hdr), iov, iovcnt, + &req->send.uct_comp); + if (UCS_PTR_IS_ERR(rndv_op)) { + return UCS_PTR_STATUS(rndv_op); + } + req->flags |= UCP_REQUEST_FLAG_OFFLOADED; + req->send.tag_offload.rndv_op = rndv_op; + return UCS_OK; +} + +void ucp_tag_offload_cancel_rndv(ucp_request_t *req) +{ + ucp_ep_t *ep = req->send.ep; + ucs_status_t status; + + status = uct_ep_tag_rndv_cancel(ep->uct_eps[ucp_ep_get_tag_lane(ep)], + req->send.tag_offload.rndv_op); + if (status != UCS_OK) { + ucs_error("Failed to cancel tag rndv op %s", ucs_status_string(status)); + } +} + +ucs_status_t ucp_tag_offload_start_rndv(ucp_request_t *sreq) +{ + ucs_status_t status; + ucp_lane_index_t lane = ucp_ep_get_tag_lane(sreq->send.ep); + + sreq->send.lane = lane; + if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { + status = ucp_request_send_buffer_reg(sreq, lane); + if (status != UCS_OK) { + return status; + } + sreq->send.uct.func = ucp_tag_offload_rndv_zcopy; + } else { + sreq->send.uct.func = ucp_tag_offload_sw_rndv; + } + return UCS_OK; +} + const ucp_proto_t ucp_tag_offload_proto = { .contig_short = ucp_tag_offload_eager_short, .bcopy_single = ucp_tag_offload_eager_bcopy, diff --git a/src/ucp/tag/offload.h b/src/ucp/tag/offload.h index 2805890677b..0a1665f1496 100644 --- a/src/ucp/tag/offload.h +++ b/src/ucp/tag/offload.h @@ -12,13 +12,31 @@ #include +typedef struct ucp_sw_rndv_hdr { + ucp_request_hdr_t super; + size_t length; +} UCS_S_PACKED ucp_sw_rndv_hdr_t; + + extern const ucp_proto_t ucp_tag_offload_proto; extern const ucp_proto_t ucp_tag_offload_sync_proto; +ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self); + +void ucp_tag_offload_cancel_rndv(ucp_request_t *req); + +ucs_status_t ucp_tag_offload_start_rndv(ucp_request_t *sreq); + ucs_status_t ucp_tag_offload_unexp_eager(void *arg, void *data, size_t length, unsigned flags, uct_tag_t stag, uint64_t imm); + +ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, uint64_t stag, + const void *hdr, unsigned hdr_length, + uint64_t remote_addr, size_t length, + const void *rkey_buf); + void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force); int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req); diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index f84936ec433..e2b2a568370 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -7,6 +7,7 @@ #include "rndv.h" #include "tag_match.inl" +#include "offload.h" #include #include #include @@ -121,8 +122,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rtr, (self), return status; } -void ucp_tag_send_start_rndv(ucp_request_t *sreq) +ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq) { + ucs_status_t status; + ucs_trace_req("starting rndv. sreq: %p. buffer: %p, length: %zu", sreq, sreq->send.buffer, sreq->send.length); sreq->flags |= UCP_REQUEST_FLAG_RNDV; @@ -132,8 +135,15 @@ void ucp_tag_send_start_rndv(ucp_request_t *sreq) if (UCP_DT_IS_CONTIG(sreq->send.datatype)) { sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; } - - sreq->send.uct.func = ucp_proto_progress_rndv_rts; + if (sreq->send.ep->flags & UCP_EP_FLAG_TAG_OFFLOAD_ENABLED) { + status = ucp_tag_offload_start_rndv(sreq); + if (status != UCS_OK) { + return status; + } + } else { + sreq->send.uct.func = ucp_proto_progress_rndv_rts; + } + return UCS_OK; } static void ucp_rndv_send_ats(ucp_request_t *rndv_req, uintptr_t remote_request) @@ -398,7 +408,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), } if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) { - if ((rndv_rts_hdr->address != 0) && ucp_ep_is_rndv_lane_present(ep)) { + if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep) || + (rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD))) { /* read the data from the sender with a get_zcopy operation on the * rndv lane */ ucp_rndv_handle_recv_contig(rndv_req, rreq, rndv_rts_hdr); @@ -421,12 +432,14 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), } UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, - (arg, data, length, am_flags), - void *arg, void *data, size_t length, unsigned am_flags) + (arg, data, length, tl_flags, desc_flags), + void *arg, void *data, size_t length, unsigned tl_flags, + unsigned desc_flags) { const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | - UCP_RECV_DESC_FLAG_LAST | - UCP_RECV_DESC_FLAG_RNDV; + UCP_RECV_DESC_FLAG_LAST | + UCP_RECV_DESC_FLAG_RNDV | + desc_flags; ucp_worker_h worker = arg; ucp_rndv_rts_hdr_t *rndv_rts_hdr = data; ucp_context_h context = worker->context; @@ -440,16 +453,27 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, if (rreq != NULL) { ucp_rndv_matched(worker, rreq, rndv_rts_hdr); + /* Cancel req in transport if it was offloaded, because it arrived + as unexpected */ + if (recv_flags & UCP_RECV_DESC_FLAG_OFFLOAD) { + ucp_tag_offload_cancel(context, rreq, 1); + } + UCP_WORKER_STAT_RNDV(worker, EXP); status = UCS_OK; } else { - status = ucp_tag_unexp_recv(&context->tm, worker, data, length, am_flags, + status = ucp_tag_unexp_recv(&context->tm, worker, data, length, tl_flags, sizeof(*rndv_rts_hdr), recv_flags); } UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); return status; } +ucs_status_t ucp_rndv_rts_handler_wrap(void *arg, void *data, size_t length, + unsigned tl_flags) +{ + return ucp_rndv_rts_handler(arg, data, length, tl_flags, 0); +} UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler, (arg, data, length, flags), @@ -460,7 +484,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler, /* dereg the original send request and set it to complete */ UCS_PROFILE_REQUEST_EVENT(sreq, "rndv_ats_recv", 0); - ucp_rndv_rma_request_send_buffer_dereg(sreq); + if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) { + ucp_tag_offload_cancel_rndv(sreq); + ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep)); + } else { + ucp_rndv_rma_request_send_buffer_dereg(sreq); + } ucp_request_send_generic_dt_finish(sreq); ucp_request_complete_send(sreq, UCS_OK); return UCS_OK; @@ -591,7 +620,11 @@ static void ucp_rndv_prepare_zcopy_send_buffer(ucp_request_t *sreq, ucp_ep_h ep) { ucs_status_t status; - if ((ucp_ep_is_rndv_lane_present(ep)) && + if ((sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) && + (ucp_ep_get_am_lane(ep) != ucp_ep_get_tag_lane(ep))) { + ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep)); + sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL; + } else if ((ucp_ep_is_rndv_lane_present(ep)) && (ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep))) { /* dereg the original send request since we are going to send on the AM lane next */ ucp_rndv_rma_request_send_buffer_dereg(sreq); @@ -636,6 +669,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler, ucs_assert_always(!ucp_ep_is_stub(ep)); ucs_trace_req("RTR received. start sending on sreq %p", sreq); + if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) { + /* Do not deregister memory here, because am zcopy rndv may + * need it registered (if am and tag is the same lane). */ + ucp_tag_offload_cancel_rndv(sreq); + } + if ((UCP_DT_IS_CONTIG(sreq->send.datatype)) && (sreq->send.length >= ucp_ep_config(ep)->am.zcopy_thresh[0])) { /* send with zcopy */ @@ -751,7 +790,7 @@ static void ucp_rndv_dump(ucp_worker_h worker, uct_am_trace_type_t type, } } -UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler, +UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler_wrap, ucp_rndv_dump, UCT_AM_CB_FLAG_SYNC); UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_ATS, ucp_rndv_ats_handler, ucp_rndv_dump, UCT_AM_CB_FLAG_SYNC); diff --git a/src/ucp/tag/rndv.h b/src/ucp/tag/rndv.h index 002141e6003..db04ad58b8d 100644 --- a/src/ucp/tag/rndv.h +++ b/src/ucp/tag/rndv.h @@ -14,7 +14,8 @@ #include enum { - UCP_RNDV_RTS_FLAG_PACKED_RKEY = UCS_BIT(0) + UCP_RNDV_RTS_FLAG_PACKED_RKEY = UCS_BIT(0), + UCP_RNDV_RTS_FLAG_OFFLOAD = UCS_BIT(1) }; /* @@ -45,13 +46,18 @@ typedef struct { } UCS_S_PACKED ucp_rndv_data_hdr_t; -void ucp_tag_send_start_rndv(ucp_request_t *req); +ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *req); void ucp_rndv_matched(ucp_worker_h worker, ucp_request_t *req, ucp_rndv_rts_hdr_t *rndv_rts_hdr); ucs_status_t ucp_proto_progress_rndv_get_zcopy(uct_pending_req_t *self); +ucs_status_t +ucp_rndv_rts_handler(void *arg, void *data, size_t length, unsigned tl_flags, + unsigned desc_flags); + + static inline size_t ucp_rndv_total_len(ucp_rndv_rts_hdr_t *hdr) { return hdr->size; diff --git a/src/ucp/tag/tag_send.c b/src/ucp/tag/tag_send.c index 714d8bead50..06d5b7d561e 100644 --- a/src/ucp/tag/tag_send.c +++ b/src/ucp/tag/tag_send.c @@ -42,7 +42,7 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, req->send.state.dt.iov.iov_offset = 0; req->send.state.dt.iov.iovcnt = count; flag_iov_single = (count <= config->tag.eager.max_iov); - + if (!flag_iov_single && ucp_ep_is_tag_offload_enabled(config)) { /* Make sure SW RNDV will be used, becasuse tag offload does * not support multi-packet eager protocols. */ @@ -85,7 +85,10 @@ static ucs_status_t ucp_tag_req_start(ucp_request_t *req, size_t count, } else if ((length >= rndv_rma_thresh) || (length >= rndv_am_thresh) || force_sw_rndv) { /* RMA/AM rendezvous */ - ucp_tag_send_start_rndv(req); + status = ucp_tag_send_start_rndv(req); + if (status != UCS_OK) { + return status; + } UCS_PROFILE_REQUEST_EVENT(req, "start_rndv", req->send.length); } else if (length < zcopy_thresh) { /* bcopy */ diff --git a/src/ucp/wireup/stub_ep.c b/src/ucp/wireup/stub_ep.c index b2e1f5917e1..68be9f57788 100644 --- a/src/ucp/wireup/stub_ep.c +++ b/src/ucp/wireup/stub_ep.c @@ -253,6 +253,7 @@ static uct_iface_t ucp_stub_iface = { .ep_tag_eager_bcopy = (void*)ucp_stub_ep_bcopy_send_func, .ep_tag_eager_zcopy = (void*)ucp_stub_ep_send_func, .ep_tag_rndv_zcopy = (void*)ucs_empty_function_return_ptr_no_resource, + .ep_tag_rndv_request = (void*)ucp_stub_ep_send_func, .ep_atomic_add64 = (void*)ucp_stub_ep_send_func, .ep_atomic_fadd64 = (void*)ucp_stub_ep_send_func, .ep_atomic_swap64 = (void*)ucp_stub_ep_send_func, diff --git a/test/gtest/ucp/test_ucp_tag_probe.cc b/test/gtest/ucp/test_ucp_tag_probe.cc index 9d66ad73d80..8e09ce995bf 100644 --- a/test/gtest/ucp/test_ucp_tag_probe.cc +++ b/test/gtest/ucp/test_ucp_tag_probe.cc @@ -43,7 +43,7 @@ class test_ucp_tag_probe : public test_ucp_tag { 0x111337); } else { - send_b(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337); + send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337); } do { @@ -79,7 +79,7 @@ class test_ucp_tag_probe : public test_ucp_tag { } request_release(recv_req); - if (is_sync) { + if (UCS_PTR_IS_PTR(send_req)) { wait(send_req); EXPECT_TRUE(send_req->completed); EXPECT_EQ(UCS_OK, send_req->status); From eb0a6a07b40927cbbcdf3f3d91f191ea694cde11 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Sun, 28 May 2017 16:54:43 +0300 Subject: [PATCH 2/5] UCP: tm rndv coverity fix --- src/ucp/tag/offload.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index ded24bd340c..a7385896edc 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -58,11 +58,13 @@ void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag, rts.sreq = sreq->super; rts.super.tag = stag; + rts.flags = 0; rts.address = 0; /* RNDV needs to be completed in SW */ rts.size = sreq->length; ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype, &req->recv.state); + /* coverity[address_of] */ ucp_rndv_matched(req->recv.worker, req, &rts); } From c647f8a2f9d38f94f6805ba129d64d34d30dbb95 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Sun, 28 May 2017 20:33:24 +0300 Subject: [PATCH 3/5] UCP: tm offload rndv fixes p1 --- src/ucp/tag/offload.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index a7385896edc..edf92b9577d 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -290,7 +290,6 @@ ucs_status_t ucp_tag_offload_sw_rndv(uct_pending_req_t *self) ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self) { - void *rndv_op; ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; size_t max_iov = ucp_ep_config(ep)->tag.eager.max_iov; @@ -300,6 +299,7 @@ ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self) .sender_uuid = ep->worker->uuid, .reqptr = (uintptr_t)req }; + void *rndv_op; req->send.uct_comp.count = 1; req->send.uct_comp.func = ucp_tag_eager_zcopy_completion; From bd9c7b8b7350b0959dab34b68a0dfe7925d13e59 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Mon, 29 May 2017 21:20:01 +0300 Subject: [PATCH 4/5] UCP: tm offload rndv fixes p2 --- src/ucp/tag/eager_rcv.c | 2 ++ src/ucp/tag/offload.c | 71 +++++++++++++++++++++------------------ src/ucp/tag/rndv.c | 23 ++++++------- src/ucp/tag/rndv.h | 5 ++- src/ucp/tag/tag_match.inl | 7 ++-- 5 files changed, 57 insertions(+), 51 deletions(-) diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index a9f4b3ab1a7..88ecd8a865b 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -75,6 +75,8 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, * because it arrived as unexpected */ if (flags & UCP_RECV_DESC_FLAG_OFFLOAD) { ucp_tag_offload_cancel(context, req, 1); + } else { + ucs_assert(!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)); } if (flags & UCP_RECV_DESC_FLAG_LAST) { diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index edf92b9577d..e8d8cc779d0 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -74,34 +74,40 @@ ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, uint64_t remote_addr, size_t length, const void *rkey_buf) { - ucp_rndv_rts_hdr_t *rts = (ucp_rndv_rts_hdr_t*)(((ucp_tag_hdr_t*)hdr) - 1); - ucp_worker_t *worker = arg; - void *rkey = rts + 1; - size_t len = sizeof(*rts); - ucp_ep_t *ep = ucp_worker_get_reply_ep(worker, rts->sreq.sender_uuid); - const uct_md_attr_t *md_attrs; - size_t rkey_size; - - /* rts.req should be alredy in place - it is sent by the sender. - * Fill the rest of rts header and pass to common rts handler */ - if (rkey_buf) { - /* Copy rkey before to fill rts, to avoid overriding rkey */ - md_attrs = ucp_ep_md_attr(ep, ucp_ep_get_tag_lane(ep)); - rkey_size = md_attrs->rkey_packed_size; - memcpy(rkey, rkey_buf, rkey_size); - len += rkey_size; - rts->flags = UCP_RNDV_RTS_FLAG_PACKED_RKEY | UCP_RNDV_RTS_FLAG_OFFLOAD; + ucp_worker_t *worker = arg; + ucp_request_hdr_t *rndv_hdr = (ucp_request_hdr_t*)hdr; + ucp_ep_t *ep = ucp_worker_get_reply_ep(worker, rndv_hdr->sender_uuid); + const uct_md_attr_t *md_attr = ucp_ep_md_attr(ep, ucp_ep_get_tag_lane(ep)); + size_t rkey_size = rkey_buf ? md_attr->rkey_packed_size : 0; + size_t len = sizeof(ucp_rndv_rts_hdr_t) + rkey_size; + ucp_rndv_rts_hdr_t *rts = ucs_alloca(len); + ucp_sw_rndv_hdr_t *sw_rndv_hdr; + + /* Fill RTS to emulate SW RNDV flow. */ + rts->super.tag = stag; + rts->sreq = *rndv_hdr; + rts->address = remote_addr; + + if (remote_addr) { rts->size = length; + rts->flags = UCP_RNDV_RTS_FLAG_OFFLOAD; + if (rkey_buf) { + memcpy(rts + 1, rkey_buf, rkey_size); + len += rkey_size; + rts->flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY; + } } else { - ucs_assert(remote_addr == 0ul); /* This must be SW RNDV request. Take length from its header. */ - rts->size = ((ucp_sw_rndv_hdr_t*)hdr)->length; + sw_rndv_hdr = ucs_derived_of(hdr, ucp_sw_rndv_hdr_t); + rts->size = sw_rndv_hdr->length; + rts->flags = 0; } - rts->super.tag = stag; - rts->address = remote_addr; + /* Pass 0 as tl flags, because RTS needs to be stored in UCP pool. */ + ucp_rndv_process_rts(arg, rts, len, 0); - return ucp_rndv_rts_handler(arg, rts, len, flags, UCP_RECV_DESC_FLAG_OFFLOAD); + /* Always return UCS_OK, since RNDV hdr should be stored in UCP mpool. */ + return UCS_OK; } void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) @@ -109,16 +115,17 @@ void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) ucp_worker_iface_t *ucp_iface; ucs_status_t status; - ucs_assert(req->flags & UCP_REQUEST_FLAG_OFFLOADED); - - ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, - ucp_worker_iface_t, queue); - ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype, - &req->recv.state); - status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, force); - if (status != UCS_OK) { - ucs_error("Failed to cancel recv in the transport: %s", - ucs_status_string(status)); + if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) { + ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, + ucp_worker_iface_t, queue); + ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype, + &req->recv.state); + status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, + force); + if (status != UCS_OK) { + ucs_error("Failed to cancel recv in the transport: %s", + ucs_status_string(status)); + } } } diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index e2b2a568370..d92992fe61c 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -431,15 +431,13 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), UCS_ASYNC_UNBLOCK(&worker->async); } -UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, - (arg, data, length, tl_flags, desc_flags), - void *arg, void *data, size_t length, unsigned tl_flags, - unsigned desc_flags) +UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_process_rts, + (arg, data, length, tl_flags), + void *arg, void *data, size_t length, unsigned tl_flags) { const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | UCP_RECV_DESC_FLAG_LAST | - UCP_RECV_DESC_FLAG_RNDV | - desc_flags; + UCP_RECV_DESC_FLAG_RNDV; ucp_worker_h worker = arg; ucp_rndv_rts_hdr_t *rndv_rts_hdr = data; ucp_context_h context = worker->context; @@ -455,9 +453,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, /* Cancel req in transport if it was offloaded, because it arrived as unexpected */ - if (recv_flags & UCP_RECV_DESC_FLAG_OFFLOAD) { - ucp_tag_offload_cancel(context, rreq, 1); - } + ucp_tag_offload_cancel(context, rreq, 1); UCP_WORKER_STAT_RNDV(worker, EXP); status = UCS_OK; @@ -469,10 +465,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock); return status; } -ucs_status_t ucp_rndv_rts_handler_wrap(void *arg, void *data, size_t length, - unsigned tl_flags) + +ucs_status_t ucp_rndv_rts_handler(void *arg, void *data, size_t length, + unsigned tl_flags) { - return ucp_rndv_rts_handler(arg, data, length, tl_flags, 0); + return ucp_rndv_process_rts(arg, data, length, tl_flags); } UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler, @@ -790,7 +787,7 @@ static void ucp_rndv_dump(ucp_worker_h worker, uct_am_trace_type_t type, } } -UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler_wrap, +UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler, ucp_rndv_dump, UCT_AM_CB_FLAG_SYNC); UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_ATS, ucp_rndv_ats_handler, ucp_rndv_dump, UCT_AM_CB_FLAG_SYNC); diff --git a/src/ucp/tag/rndv.h b/src/ucp/tag/rndv.h index db04ad58b8d..525e60d8a03 100644 --- a/src/ucp/tag/rndv.h +++ b/src/ucp/tag/rndv.h @@ -53,9 +53,8 @@ void ucp_rndv_matched(ucp_worker_h worker, ucp_request_t *req, ucs_status_t ucp_proto_progress_rndv_get_zcopy(uct_pending_req_t *self); -ucs_status_t -ucp_rndv_rts_handler(void *arg, void *data, size_t length, unsigned tl_flags, - unsigned desc_flags); +ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, + unsigned tl_flags); static inline size_t ucp_rndv_total_len(ucp_rndv_rts_hdr_t *hdr) diff --git a/src/ucp/tag/tag_match.inl b/src/ucp/tag/tag_match.inl index e370910d102..f1b3929a979 100644 --- a/src/ucp/tag/tag_match.inl +++ b/src/ucp/tag/tag_match.inl @@ -156,14 +156,15 @@ ucp_tag_unexp_recv(ucp_tag_match_t *tm, ucp_worker_h worker, void *data, size_t length, unsigned am_flags, uint16_t hdr_len, uint16_t flags) { - ucp_recv_desc_t *rdesc = (ucp_recv_desc_t *)data - 1; + ucp_recv_desc_t *rdesc; ucs_list_link_t *hash_list; ucs_status_t status; if (ucs_unlikely(am_flags & UCT_CB_FLAG_DESC)) { - /* desc==data is slowpath */ + /* slowpath */ + rdesc = (ucp_recv_desc_t *)data - 1; rdesc->flags = flags | UCP_RECV_DESC_FLAG_UCT_DESC; - status = UCS_INPROGRESS; + status = UCS_INPROGRESS; } else { rdesc = (ucp_recv_desc_t*)ucs_mpool_get_inline(&worker->am_mp); if (rdesc == NULL) { From 2ba0e9a7aa052d72b0ae0d9bae7d9842fd92cf52 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Thu, 1 Jun 2017 17:09:34 +0300 Subject: [PATCH 5/5] UCP: tm offload rndv fixes p3 --- src/ucp/tag/offload.c | 35 +++++++++++++++++++---------------- src/ucp/tag/rndv.c | 10 +++++----- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index e8d8cc779d0..fa960fa6786 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -56,6 +56,7 @@ void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag, ucp_worker_iface_t, queue); ucp_rndv_rts_hdr_t rts; + /* Emulate RTS without rkey (to be handled as AM-based RNDV). */ rts.sreq = sreq->super; rts.super.tag = stag; rts.flags = 0; @@ -68,11 +69,11 @@ void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag, ucp_rndv_matched(req->recv.worker, req, &rts); } -ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, - uint64_t stag, const void *hdr, - unsigned hdr_length, - uint64_t remote_addr, size_t length, - const void *rkey_buf) +UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv, + (arg, flags, stag, hdr, hdr_length, remote_addr, length, rkey_buf), + void *arg, unsigned flags, uint64_t stag, const void *hdr, + unsigned hdr_length, uint64_t remote_addr, size_t length, + const void *rkey_buf) { ucp_worker_t *worker = arg; ucp_request_hdr_t *rndv_hdr = (ucp_request_hdr_t*)hdr; @@ -115,17 +116,19 @@ void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force) ucp_worker_iface_t *ucp_iface; ucs_status_t status; - if (req->flags & UCP_REQUEST_FLAG_OFFLOADED) { - ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, - ucp_worker_iface_t, queue); - ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype, - &req->recv.state); - status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, - force); - if (status != UCS_OK) { - ucs_error("Failed to cancel recv in the transport: %s", - ucs_status_string(status)); - } + if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) { + return; + } + + ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces, + ucp_worker_iface_t, queue); + ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype, + &req->recv.state); + status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, + force); + if (status != UCS_OK) { + ucs_error("Failed to cancel recv in the transport: %s", + ucs_status_string(status)); } } diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index d92992fe61c..b68443631de 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -431,9 +431,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), UCS_ASYNC_UNBLOCK(&worker->async); } -UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_process_rts, - (arg, data, length, tl_flags), - void *arg, void *data, size_t length, unsigned tl_flags) +ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, + unsigned tl_flags) { const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST | UCP_RECV_DESC_FLAG_LAST | @@ -466,8 +465,9 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_process_rts, return status; } -ucs_status_t ucp_rndv_rts_handler(void *arg, void *data, size_t length, - unsigned tl_flags) +UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler, + (arg, data, length, tl_flags), + void *arg, void *data, size_t length, unsigned tl_flags) { return ucp_rndv_process_rts(arg, data, length, tl_flags); }