Skip to content

Commit

Permalink
UCP/DT/STREAM: Add function to unpack request data and unpack single …
Browse files Browse the repository at this point in the history
…fragment.

- Stream receive to maintain its own receive offset for the unpack
  function - as preparation to not using common receive offset for all
  protocols. For example, multi-lane tag-matching will not not need it.
- Add ucp_request_recv_data_unpack() to unpack and update the receive
  request state.
- Add ucp_dt_unpack_only() to unpack single fragment into a buffer.
  • Loading branch information
yosefe committed Jan 12, 2018
1 parent b5a98e1 commit 60befde
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 40 deletions.
1 change: 1 addition & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ struct ucp_request {

struct {
ucp_stream_recv_callback_t cb; /* Completion callback */
size_t offset; /* Receive data offset */
size_t length; /* Completion info to fill */
} stream;
};
Expand Down
71 changes: 66 additions & 5 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <ucp/dt/dt.h>
#include <ucs/debug/profile.h>
#include <ucs/datastruct/mpool.inl>
#include <ucp/dt/dt.inl>
#include <inttypes.h>


Expand Down Expand Up @@ -104,9 +105,9 @@ ucp_request_complete_stream_recv(ucp_request_t *req,
ucs_queue_pull_elem_non_empty(&ep_stream->match_q, ucp_request_t,
recv.queue);
ucs_assert(check_req == req);
ucs_assert(req->recv.state.offset > 0);
ucs_assert(req->recv.stream.offset > 0);

req->recv.stream.length = req->recv.state.offset;
req->recv.stream.length = req->recv.stream.offset;
ucs_trace_req("completing stream receive request %p (%p) "
UCP_REQUEST_FLAGS_FMT" count %zu, %s",
req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
Expand All @@ -120,17 +121,17 @@ ucp_request_can_complete_stream_recv(ucp_request_t *req)
{
/* NOTE: first check is needed to avoid heavy "%" operation if request is
* completely filled */
if (req->recv.state.offset == req->recv.length) {
if (req->recv.stream.offset == req->recv.length) {
return 1;
}

/* 0-length stream recv is meaningless if this was not requested explicitely */
if (req->recv.state.offset == 0) {
if (req->recv.stream.offset == 0) {
return 0;
}

if (ucs_likely(UCP_DT_IS_CONTIG(req->recv.datatype))) {
return req->recv.state.offset %
return req->recv.stream.offset %
ucp_contig_dt_elem_size(req->recv.datatype) == 0;
}

Expand Down Expand Up @@ -385,6 +386,66 @@ ucp_request_wait_uct_comp(ucp_request_t *req)
}
}

/**
* Unpack receive data to a request
*
* req - receive request
* data - data to unpack
* length -
* offset - offset of received data within the request, for OOO fragments
*
*
*/
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_request_recv_data_unpack(ucp_request_t *req, const void *data,
size_t length, size_t offset, int last)
{
ucp_dt_generic_t *dt_gen;
ucs_status_t status;

ucp_trace_req(req, "unpack recv_data req_len %zu data_len %zu offset %zu last: %s",
req->recv.length, length, offset, last ? "yes" : "no");

if (ucs_unlikely((length + offset) > req->recv.length)) {
ucs_debug("message truncated: recv_length %zu offset %zu buffer_size %zu",
length, offset, req->recv.length);
if (UCP_DT_IS_GENERIC(req->recv.datatype)) {
dt_gen = ucp_dt_generic(req->recv.datatype);
UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
req->recv.state.dt.generic.state);
}
return UCS_ERR_MESSAGE_TRUNCATED;
}

switch (req->recv.datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, req->recv.buffer + offset,
data, length);
return UCS_OK;

case UCP_DATATYPE_IOV:
UCS_PROFILE_CALL(ucp_dt_iov_scatter, req->recv.buffer,
req->recv.state.dt.iov.iovcnt, data, length,
&req->recv.state.dt.iov.iov_offset,
&req->recv.state.dt.iov.iovcnt_offset);
return UCS_OK;

case UCP_DATATYPE_GENERIC:
dt_gen = ucp_dt_generic(req->recv.datatype);
status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack,
req->recv.state.dt.generic.state,
offset, data, length);
if (last) {
UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
req->recv.state.dt.generic.state);
}
return status;

default:
ucs_fatal("unexpected datatype=%lx", req->recv.datatype);
}
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length,
unsigned am_flags, uint16_t hdr_len, uint16_t rdesc_flags,
Expand Down
59 changes: 58 additions & 1 deletion src/ucp/dt/dt.inl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
* See file LICENSE for terms.
*/

#ifndef UCP_DT_INL_
#define UCP_DT_INL_

#include <ucs/debug/profile.h>

/**
Expand Down Expand Up @@ -84,6 +87,58 @@ ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size,
}
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_dt_unpack_only(void *buffer, size_t count, ucp_datatype_t datatype,
const void *data, size_t length, int truncation)
{
size_t iov_offset, iovcnt_offset;
ucp_dt_generic_t *dt_gen;
ucs_status_t status;
size_t buffer_size;
void *state;

switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
if (truncation &&
ucs_unlikely(length > (buffer_size = ucp_contig_dt_length(datatype, count)))) {
goto err_truncated;
}
UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer, data, length);
return UCS_OK;

case UCP_DATATYPE_IOV:
if (truncation &&
ucs_unlikely(length > (buffer_size = ucp_dt_iov_length(buffer, count)))) {
goto err_truncated;
}
iov_offset = iovcnt_offset = 0;
UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, count, data, length,
&iov_offset, &iovcnt_offset);
return UCS_OK;

case UCP_DATATYPE_GENERIC:
dt_gen = ucp_dt_generic(datatype);
state = UCS_PROFILE_NAMED_CALL("dt_start", dt_gen->ops.start_unpack,
dt_gen->context, buffer, count);
if (truncation &&
ucs_unlikely(length > (buffer_size = dt_gen->ops.packed_size(state)))) {
goto err_truncated;
}
status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, state,
0, data, length);
UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, state);
return status;

default:
ucs_fatal("unexpected datatype=%lx", datatype);
}

err_truncated:
ucs_debug("message truncated: recv_length %zu buffer_size %zu", length,
buffer_size);
return UCS_ERR_MESSAGE_TRUNCATED;
}

static UCS_F_ALWAYS_INLINE void
ucp_dt_recv_state_init(ucp_dt_state_t *dt_state, void *buffer,
ucp_datatype_t dt, size_t dt_count)
Expand All @@ -104,7 +159,7 @@ ucp_dt_recv_state_init(ucp_dt_state_t *dt_state, void *buffer,
break;
case UCP_DATATYPE_GENERIC:
dt_gen = ucp_dt_generic(dt);
dt_state->dt.generic.state =
dt_state->dt.generic.state =
UCS_PROFILE_NAMED_CALL("dt_start", dt_gen->ops.start_unpack,
dt_gen->context, buffer, dt_count);
ucs_trace("dt state %p buffer %p count %zu dt_gen state=%p", dt_state,
Expand All @@ -114,3 +169,5 @@ ucp_dt_recv_state_init(ucp_dt_state_t *dt_state, void *buffer,
break;
}
}

#endif
37 changes: 18 additions & 19 deletions src/ucp/stream/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,13 @@ ucp_stream_rdata_unpack(const void *rdata, size_t length, ucp_request_t *dst_req
{
/* Truncated error is not actual for stream, need to adjust */
size_t valid_len = ucs_min((dst_req->recv.length -
dst_req->recv.state.offset), length);
dst_req->recv.stream.offset), length);
ucs_status_t status;

status = ucp_dt_unpack(dst_req->recv.datatype, dst_req->recv.buffer,
dst_req->recv.length, &dst_req->recv.state,
rdata, valid_len, UCP_RECV_DESC_FLAG_LAST);

status = ucp_request_recv_data_unpack(dst_req, rdata, valid_len,
dst_req->recv.stream.offset, 1);
if (ucs_likely(status == UCS_OK)) {
dst_req->recv.state.offset += valid_len;
dst_req->recv.stream.offset += valid_len;
ucs_trace_data("unpacked %zd bytes of stream data %p",
valid_len, rdata);
return valid_len;
Expand Down Expand Up @@ -191,16 +189,14 @@ ucp_stream_rdesc_advance(ucp_recv_desc_t *rdesc, ssize_t offset,

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_process_rdesc_inplace(ucp_recv_desc_t *rdesc, ucp_datatype_t dt,
void *buffer, size_t length,
ucp_ep_ext_stream_t *ep_stream,
ucp_dt_state_t *state)
void *buffer, size_t count, size_t length,
ucp_ep_ext_stream_t *ep_stream)
{
ucs_status_t status;
ssize_t unpacked;

status = ucp_dt_unpack(dt, buffer, length, state,
ucp_stream_rdesc_payload(rdesc), length,
UCP_RECV_DESC_FLAG_LAST);
status = ucp_dt_unpack_only(buffer, count, dt,
ucp_stream_rdesc_payload(rdesc), length, 0);
unpacked = ucs_likely(status == UCS_OK) ? length : status;

return ucp_stream_rdesc_advance(rdesc, unpacked, ep_stream);
Expand All @@ -214,7 +210,7 @@ ucp_stream_process_rdesc(ucp_recv_desc_t *rdesc, ucp_ep_ext_stream_t *ep_stream,

unpacked = ucp_stream_rdata_unpack(ucp_stream_rdesc_payload(rdesc),
rdesc->length, req);
ucs_assert(req->recv.state.offset <= req->recv.length);
ucs_assert(req->recv.stream.offset <= req->recv.length);

return ucp_stream_rdesc_advance(rdesc, unpacked, ep_stream);
}
Expand All @@ -226,8 +222,12 @@ ucp_stream_recv_request_init(ucp_request_t *req, void *buffer, size_t count,
{
req->flags = UCP_REQUEST_FLAG_CALLBACK |
UCP_REQUEST_FLAG_STREAM_RECV;
#if ENABLE_ASSERT
req->status = UCS_OK; /* for ucp_request_recv_data_unpack() */
#endif
req->recv.stream.cb = cb;
req->recv.stream.length = 0;
req->recv.stream.offset = 0;

req->recv.buffer = buffer;
req->recv.datatype = datatype;
Expand Down Expand Up @@ -266,10 +266,9 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb,
UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock);

if (ucs_likely(ucp_stream_recv_nb_is_inplace(ep_stream, dt_length))) {
ucp_dt_recv_state_init(&dt_state, buffer, datatype, count);
status = ucp_stream_process_rdesc_inplace(ucp_stream_rdesc_get(ep_stream),
datatype, buffer, dt_length,
ep_stream, &dt_state);
datatype, buffer, count,
dt_length, ep_stream);
*length = dt_length;
goto out_status;
}
Expand All @@ -283,7 +282,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb,
ucp_stream_recv_request_init(req, buffer, count, dt_length, datatype, cb);

/* OK, lets obtain all arrived data which matches the recv size */
while ((req->recv.state.offset < req->recv.length) &&
while ((req->recv.stream.offset < req->recv.length) &&
(ep_stream->flags & UCP_EP_STREAM_FLAG_HAS_DATA)) {

rdesc = ucp_stream_rdesc_get(ep_stream);
Expand All @@ -293,10 +292,10 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb,
}
}

ucs_assert(req->recv.state.offset <= req->recv.length);
ucs_assert(req->recv.stream.offset <= req->recv.length);

if (ucp_request_can_complete_stream_recv(req)) {
*length = req->recv.state.offset;
*length = req->recv.stream.offset;
} else {
ucs_assert(!(ep_stream->flags & UCP_EP_STREAM_FLAG_HAS_DATA));
ucs_queue_push(&ep_stream->match_q, &req->recv.queue);
Expand Down
7 changes: 3 additions & 4 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,
if (req != NULL) {
UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len);

status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer,
req->recv.length, &req->recv.state,
data + hdr_len, recv_len,
flags & UCP_RECV_DESC_FLAG_LAST);
status = ucp_request_recv_data_unpack(req, data + hdr_len, recv_len,
req->recv.state.offset,
flags & UCP_RECV_DESC_FLAG_LAST);

/* First fragment fills the receive information */
if (flags & UCP_RECV_DESC_FLAG_FIRST) {
Expand Down
6 changes: 2 additions & 4 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,8 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag,
}

if (req->recv.tag.rdesc != NULL) {
status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer,
req->recv.length, &req->recv.state,
req->recv.tag.rdesc + 1, length,
UCP_RECV_DESC_FLAG_LAST);
status = ucp_request_recv_data_unpack(req, req->recv.tag.rdesc + 1,
length, 0, 1);
ucs_mpool_put_inline(req->recv.tag.rdesc);
} else {
ucp_request_recv_buffer_dereg(req);
Expand Down
11 changes: 4 additions & 7 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,9 +830,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_handler,
}

UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_recv", recv_len);
status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer,
rreq->recv.length, &rreq->recv.state,
data + hdr_len, recv_len, 0);
status = ucp_request_recv_data_unpack(rreq, data + hdr_len, recv_len,
rreq->recv.state.offset, 0);
if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
rreq->recv.state.offset += recv_len;
return status;
Expand Down Expand Up @@ -864,10 +863,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_last_handler,
ucs_assert(rreq->recv.tag.info.length ==
rreq->recv.state.offset + recv_len);
UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_last_recv", recv_len);
status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer,
rreq->recv.length, &rreq->recv.state,
data + hdr_len, recv_len,
UCP_RECV_DESC_FLAG_LAST);
status = ucp_request_recv_data_unpack(rreq, data + hdr_len, recv_len,
rreq->recv.state.offset, 1);
} else {
ucs_trace_data("drop last segment for rreq %p, length %zu, status %s",
rreq, recv_len, ucs_status_string(rreq->status));
Expand Down

0 comments on commit 60befde

Please sign in to comment.