From 0533bfa82577830d1eae65511ad4aa296a9685ea Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Fri, 12 Jan 2018 20:42:03 +0200 Subject: [PATCH] UCP/DT/STREAM: Add function to unpack request data and unpack single fragment. - Stream receive to maintain its own receive offset for the unpack function - as preparation to not using common receive offset for all protocols. For example, multi-lane tag-matching will not not need it. - Add ucp_request_recv_data_unpack() to unpack and update the receive request state. - Add ucp_dt_unpack_only() to unpack single fragment into a buffer. --- src/ucp/core/ucp_request.h | 1 + src/ucp/core/ucp_request.inl | 71 +++++++++++++++++++++++++++++++++--- src/ucp/dt/dt.inl | 60 +++++++++++++++++++++++++++++- src/ucp/stream/stream_recv.c | 37 +++++++++---------- src/ucp/tag/eager_rcv.c | 7 ++-- src/ucp/tag/offload.c | 6 +-- src/ucp/tag/rndv.c | 11 ++---- 7 files changed, 153 insertions(+), 40 deletions(-) diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 7ebe41fc38f..4f055381281 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -217,6 +217,7 @@ struct ucp_request { struct { ucp_stream_recv_callback_t cb; /* Completion callback */ + size_t offset; /* Receive data offset */ size_t length; /* Completion info to fill */ } stream; }; diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index 6d24af2f0a8..e479e894bbe 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -104,9 +105,9 @@ ucp_request_complete_stream_recv(ucp_request_t *req, ucs_queue_pull_elem_non_empty(&ep_stream->match_q, ucp_request_t, recv.queue); ucs_assert(check_req == req); - ucs_assert(req->recv.state.offset > 0); + ucs_assert(req->recv.stream.offset > 0); - req->recv.stream.length = req->recv.state.offset; + req->recv.stream.length = req->recv.stream.offset; ucs_trace_req("completing stream receive request %p (%p) " UCP_REQUEST_FLAGS_FMT" count %zu, %s", req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags), @@ -120,17 +121,17 @@ ucp_request_can_complete_stream_recv(ucp_request_t *req) { /* NOTE: first check is needed to avoid heavy "%" operation if request is * completely filled */ - if (req->recv.state.offset == req->recv.length) { + if (req->recv.stream.offset == req->recv.length) { return 1; } /* 0-length stream recv is meaningless if this was not requested explicitely */ - if (req->recv.state.offset == 0) { + if (req->recv.stream.offset == 0) { return 0; } if (ucs_likely(UCP_DT_IS_CONTIG(req->recv.datatype))) { - return req->recv.state.offset % + return req->recv.stream.offset % ucp_contig_dt_elem_size(req->recv.datatype) == 0; } @@ -385,6 +386,66 @@ ucp_request_wait_uct_comp(ucp_request_t *req) } } +/** + * Unpack receive data to a request + * + * req - receive request + * data - data to unpack + * length - + * offset - offset of received data within the request, for OOO fragments + * + * + */ +static UCS_F_ALWAYS_INLINE ucs_status_t +ucp_request_recv_data_unpack(ucp_request_t *req, const void *data, + size_t length, size_t offset, int last) +{ + ucp_dt_generic_t *dt_gen; + ucs_status_t status; + + ucp_trace_req(req, "unpack recv_data req_len %zu data_len %zu offset %zu last: %s", + req->recv.length, length, offset, last ? "yes" : "no"); + + if (ucs_unlikely((length + offset) > req->recv.length)) { + ucs_debug("message truncated: recv_length %zu offset %zu buffer_size %zu", + length, offset, req->recv.length); + if (UCP_DT_IS_GENERIC(req->recv.datatype) && last) { + dt_gen = ucp_dt_generic(req->recv.datatype); + UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, + req->recv.state.dt.generic.state); + } + return UCS_ERR_MESSAGE_TRUNCATED; + } + + switch (req->recv.datatype & UCP_DATATYPE_CLASS_MASK) { + case UCP_DATATYPE_CONTIG: + UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, req->recv.buffer + offset, + data, length); + return UCS_OK; + + case UCP_DATATYPE_IOV: + UCS_PROFILE_CALL(ucp_dt_iov_scatter, req->recv.buffer, + req->recv.state.dt.iov.iovcnt, data, length, + &req->recv.state.dt.iov.iov_offset, + &req->recv.state.dt.iov.iovcnt_offset); + return UCS_OK; + + case UCP_DATATYPE_GENERIC: + dt_gen = ucp_dt_generic(req->recv.datatype); + status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, + req->recv.state.dt.generic.state, + offset, data, length); + if (last) { + UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, + req->recv.state.dt.generic.state); + } + return status; + + default: + ucs_fatal("unexpected datatype=%lx", req->recv.datatype); + } +} + static UCS_F_ALWAYS_INLINE ucs_status_t ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length, unsigned am_flags, uint16_t hdr_len, uint16_t rdesc_flags, diff --git a/src/ucp/dt/dt.inl b/src/ucp/dt/dt.inl index 0054e21a637..4bc482cb4df 100644 --- a/src/ucp/dt/dt.inl +++ b/src/ucp/dt/dt.inl @@ -4,6 +4,9 @@ * See file LICENSE for terms. */ +#ifndef UCP_DT_INL_ +#define UCP_DT_INL_ + #include /** @@ -84,6 +87,59 @@ ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size, } } +static UCS_F_ALWAYS_INLINE ucs_status_t +ucp_dt_unpack_only(void *buffer, size_t count, ucp_datatype_t datatype, + const void *data, size_t length, int truncation) +{ + size_t iov_offset, iovcnt_offset; + ucp_dt_generic_t *dt_gen; + ucs_status_t status; + size_t buffer_size; + void *state; + + switch (datatype & UCP_DATATYPE_CLASS_MASK) { + case UCP_DATATYPE_CONTIG: + if (truncation && + ucs_unlikely(length > (buffer_size = ucp_contig_dt_length(datatype, count)))) { + goto err_truncated; + } + UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer, data, length); + return UCS_OK; + + case UCP_DATATYPE_IOV: + if (truncation && + ucs_unlikely(length > (buffer_size = ucp_dt_iov_length(buffer, count)))) { + goto err_truncated; + } + iov_offset = iovcnt_offset = 0; + UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, count, data, length, + &iov_offset, &iovcnt_offset); + return UCS_OK; + + case UCP_DATATYPE_GENERIC: + dt_gen = ucp_dt_generic(datatype); + state = UCS_PROFILE_NAMED_CALL("dt_start", dt_gen->ops.start_unpack, + dt_gen->context, buffer, count); + if (truncation && + ucs_unlikely(length > (buffer_size = dt_gen->ops.packed_size(state)))) { + UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, state); + goto err_truncated; + } + status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, state, + 0, data, length); + UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, state); + return status; + + default: + ucs_fatal("unexpected datatype=%lx", datatype); + } + +err_truncated: + ucs_debug("message truncated: recv_length %zu buffer_size %zu", length, + buffer_size); + return UCS_ERR_MESSAGE_TRUNCATED; +} + static UCS_F_ALWAYS_INLINE void ucp_dt_recv_state_init(ucp_dt_state_t *dt_state, void *buffer, ucp_datatype_t dt, size_t dt_count) @@ -104,7 +160,7 @@ ucp_dt_recv_state_init(ucp_dt_state_t *dt_state, void *buffer, break; case UCP_DATATYPE_GENERIC: dt_gen = ucp_dt_generic(dt); - dt_state->dt.generic.state = + dt_state->dt.generic.state = UCS_PROFILE_NAMED_CALL("dt_start", dt_gen->ops.start_unpack, dt_gen->context, buffer, dt_count); ucs_trace("dt state %p buffer %p count %zu dt_gen state=%p", dt_state, @@ -114,3 +170,5 @@ ucp_dt_recv_state_init(ucp_dt_state_t *dt_state, void *buffer, break; } } + +#endif diff --git a/src/ucp/stream/stream_recv.c b/src/ucp/stream/stream_recv.c index feb3a068cb1..2e8cea4b7ec 100644 --- a/src/ucp/stream/stream_recv.c +++ b/src/ucp/stream/stream_recv.c @@ -153,15 +153,13 @@ ucp_stream_rdata_unpack(const void *rdata, size_t length, ucp_request_t *dst_req { /* Truncated error is not actual for stream, need to adjust */ size_t valid_len = ucs_min((dst_req->recv.length - - dst_req->recv.state.offset), length); + dst_req->recv.stream.offset), length); ucs_status_t status; - status = ucp_dt_unpack(dst_req->recv.datatype, dst_req->recv.buffer, - dst_req->recv.length, &dst_req->recv.state, - rdata, valid_len, UCP_RECV_DESC_FLAG_LAST); - + status = ucp_request_recv_data_unpack(dst_req, rdata, valid_len, + dst_req->recv.stream.offset, 1); if (ucs_likely(status == UCS_OK)) { - dst_req->recv.state.offset += valid_len; + dst_req->recv.stream.offset += valid_len; ucs_trace_data("unpacked %zd bytes of stream data %p", valid_len, rdata); return valid_len; @@ -191,16 +189,14 @@ ucp_stream_rdesc_advance(ucp_recv_desc_t *rdesc, ssize_t offset, static UCS_F_ALWAYS_INLINE ucs_status_t ucp_stream_process_rdesc_inplace(ucp_recv_desc_t *rdesc, ucp_datatype_t dt, - void *buffer, size_t length, - ucp_ep_ext_stream_t *ep_stream, - ucp_dt_state_t *state) + void *buffer, size_t count, size_t length, + ucp_ep_ext_stream_t *ep_stream) { ucs_status_t status; ssize_t unpacked; - status = ucp_dt_unpack(dt, buffer, length, state, - ucp_stream_rdesc_payload(rdesc), length, - UCP_RECV_DESC_FLAG_LAST); + status = ucp_dt_unpack_only(buffer, count, dt, + ucp_stream_rdesc_payload(rdesc), length, 0); unpacked = ucs_likely(status == UCS_OK) ? length : status; return ucp_stream_rdesc_advance(rdesc, unpacked, ep_stream); @@ -214,7 +210,7 @@ ucp_stream_process_rdesc(ucp_recv_desc_t *rdesc, ucp_ep_ext_stream_t *ep_stream, unpacked = ucp_stream_rdata_unpack(ucp_stream_rdesc_payload(rdesc), rdesc->length, req); - ucs_assert(req->recv.state.offset <= req->recv.length); + ucs_assert(req->recv.stream.offset <= req->recv.length); return ucp_stream_rdesc_advance(rdesc, unpacked, ep_stream); } @@ -226,8 +222,12 @@ ucp_stream_recv_request_init(ucp_request_t *req, void *buffer, size_t count, { req->flags = UCP_REQUEST_FLAG_CALLBACK | UCP_REQUEST_FLAG_STREAM_RECV; +#if ENABLE_ASSERT + req->status = UCS_OK; /* for ucp_request_recv_data_unpack() */ +#endif req->recv.stream.cb = cb; req->recv.stream.length = 0; + req->recv.stream.offset = 0; req->recv.buffer = buffer; req->recv.datatype = datatype; @@ -266,10 +266,9 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb, UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); if (ucs_likely(ucp_stream_recv_nb_is_inplace(ep_stream, dt_length))) { - ucp_dt_recv_state_init(&dt_state, buffer, datatype, count); status = ucp_stream_process_rdesc_inplace(ucp_stream_rdesc_get(ep_stream), - datatype, buffer, dt_length, - ep_stream, &dt_state); + datatype, buffer, count, + dt_length, ep_stream); *length = dt_length; goto out_status; } @@ -283,7 +282,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb, ucp_stream_recv_request_init(req, buffer, count, dt_length, datatype, cb); /* OK, lets obtain all arrived data which matches the recv size */ - while ((req->recv.state.offset < req->recv.length) && + while ((req->recv.stream.offset < req->recv.length) && (ep_stream->flags & UCP_EP_STREAM_FLAG_HAS_DATA)) { rdesc = ucp_stream_rdesc_get(ep_stream); @@ -293,10 +292,10 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb, } } - ucs_assert(req->recv.state.offset <= req->recv.length); + ucs_assert(req->recv.stream.offset <= req->recv.length); if (ucp_request_can_complete_stream_recv(req)) { - *length = req->recv.state.offset; + *length = req->recv.stream.offset; } else { ucs_assert(!(ep_stream->flags & UCP_EP_STREAM_FLAG_HAS_DATA)); ucs_queue_push(&ep_stream->match_q, &req->recv.queue); diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index d3b3f31b9f9..278128e5577 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -39,10 +39,9 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, if (req != NULL) { UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len); - status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer, - req->recv.length, &req->recv.state, - data + hdr_len, recv_len, - flags & UCP_RECV_DESC_FLAG_LAST); + status = ucp_request_recv_data_unpack(req, data + hdr_len, recv_len, + req->recv.state.offset, + flags & UCP_RECV_DESC_FLAG_LAST); /* First fragment fills the receive information */ if (flags & UCP_RECV_DESC_FLAG_FIRST) { diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index ec208f50d31..6d033fa22f5 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -122,10 +122,8 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, } if (req->recv.tag.rdesc != NULL) { - status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer, - req->recv.length, &req->recv.state, - req->recv.tag.rdesc + 1, length, - UCP_RECV_DESC_FLAG_LAST); + status = ucp_request_recv_data_unpack(req, req->recv.tag.rdesc + 1, + length, 0, 1); ucs_mpool_put_inline(req->recv.tag.rdesc); } else { ucp_request_recv_buffer_dereg(req); diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 33fbcf10929..566a32d616a 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -830,9 +830,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_handler, } UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_recv", recv_len); - status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer, - rreq->recv.length, &rreq->recv.state, - data + hdr_len, recv_len, 0); + status = ucp_request_recv_data_unpack(rreq, data + hdr_len, recv_len, + rreq->recv.state.offset, 0); if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { rreq->recv.state.offset += recv_len; return status; @@ -864,10 +863,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_last_handler, ucs_assert(rreq->recv.tag.info.length == rreq->recv.state.offset + recv_len); UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_last_recv", recv_len); - status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer, - rreq->recv.length, &rreq->recv.state, - data + hdr_len, recv_len, - UCP_RECV_DESC_FLAG_LAST); + status = ucp_request_recv_data_unpack(rreq, data + hdr_len, recv_len, + rreq->recv.state.offset, 1); } else { ucs_trace_data("drop last segment for rreq %p, length %zu, status %s", rreq, recv_len, ucs_status_string(rreq->status));