Skip to content

Commit

Permalink
Merge pull request openucx#6276 from brminich/topic/ucp_am_data_relea…
Browse files Browse the repository at this point in the history
…se_fix_v-1.10

UCP/AM: Fix releasing of deferred data - v1.10.x
  • Loading branch information
yosefe authored Feb 5, 2021
2 parents 8b32765 + a66c344 commit 2822e7c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 13 deletions.
29 changes: 17 additions & 12 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ UCS_PROFILE_FUNC_VOID(ucp_am_data_release, (worker, data),

if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_MALLOC)) {
/* Don't use UCS_PTR_BYTE_OFFSET here due to coverity false
* positive report. Need to step back by first_header size, where
* positive report. Need to step back by am_malloc_offset size, where
* originally allocated pointer resides. */
ucs_free((char*)rdesc - sizeof(ucp_am_first_hdr_t));
ucs_free((char*)rdesc - rdesc->am_malloc_offset);
return;
}

Expand Down Expand Up @@ -1133,6 +1133,7 @@ ucp_am_handler_common(ucp_worker_h worker, ucp_am_hdr_t *am_hdr, size_t hdr_size
ucp_recv_desc_t *desc = NULL;
void *data;
ucs_status_t status;
size_t payload_offset;

recv_flags |= (am_flags & UCT_CB_PARAM_FLAG_DESC) ?
UCP_AM_RECV_ATTR_FLAG_DATA : 0;
Expand All @@ -1150,12 +1151,13 @@ ucp_am_handler_common(ucp_worker_h worker, ucp_am_hdr_t *am_hdr, size_t hdr_size
}

ucs_assert(total_length >= am_hdr->header_length + hdr_size);
data = UCS_PTR_BYTE_OFFSET(am_hdr, hdr_size + am_hdr->header_length);
status = ucp_recv_desc_init(worker, data,
total_length - hdr_size - am_hdr->header_length,
0,
UCT_CB_PARAM_FLAG_DESC, /* pass as a const */
0, 0, -hdr_size, &desc);
data = UCS_PTR_BYTE_OFFSET(am_hdr,
hdr_size + am_hdr->header_length);
payload_offset = hdr_size + am_hdr->header_length;
status = ucp_recv_desc_init(worker, data,
total_length - payload_offset, 0,
UCT_CB_PARAM_FLAG_DESC, /* pass as a const */
0, 0, -payload_offset, &desc);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
ucs_error("worker %p could not allocate descriptor for active"
" message on callback : %u", worker, am_hdr->am_id);
Expand Down Expand Up @@ -1245,6 +1247,7 @@ ucp_am_handle_unfinished(ucp_worker_h worker, ucp_recv_desc_t *first_rdesc,
ucs_status_t status;
void *msg;
uint64_t recv_flags;
size_t desc_offset;

ucp_am_copy_data_fragment(first_rdesc, data, length, offset);

Expand Down Expand Up @@ -1281,10 +1284,12 @@ ucp_am_handle_unfinished(ucp_worker_h worker, ucp_recv_desc_t *first_rdesc,
* needed anymore,
* can overwrite)
*/
msg = UCS_PTR_BYTE_OFFSET(first_rdesc + 1,
first_rdesc->payload_offset);
first_rdesc = (ucp_recv_desc_t*)msg - 1;
first_rdesc->flags = UCP_RECV_DESC_FLAG_MALLOC;
desc_offset = first_rdesc->payload_offset;
msg = UCS_PTR_BYTE_OFFSET(first_rdesc + 1,
first_rdesc->payload_offset);
first_rdesc = (ucp_recv_desc_t*)msg - 1;
first_rdesc->flags = UCP_RECV_DESC_FLAG_MALLOC;
first_rdesc->am_malloc_offset = desc_offset;

return;
}
Expand Down
8 changes: 7 additions & 1 deletion src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,14 @@ struct ucp_recv_desc {
ucs_queue_elem_t am_mid_queue; /* AM middle fragments queue */
};
uint32_t length; /* Received length */
uint32_t payload_offset; /* Offset from end of the descriptor
union {
uint32_t payload_offset; /* Offset from end of the descriptor
* to AM data */
uint32_t am_malloc_offset; /* Offset from rdesc, holding
assembled multi-fragment active
message, to the originally
malloc'd buffer pointer */
};
uint16_t flags; /* Flags */
int16_t uct_desc_offset; /* Offset which needs to be
substructed from rdesc when
Expand Down
72 changes: 72 additions & 0 deletions test/gtest/ucp/test_ucp_am.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,78 @@ UCS_TEST_P(test_ucp_am_nbx_closed_ep, rx_rts_reply_am_on_closed_ep, "RNDV_THRESH
UCP_INSTANTIATE_TEST_CASE(test_ucp_am_nbx_closed_ep)


class test_ucp_am_nbx_eager_data_release : public test_ucp_am_nbx {
public:
test_ucp_am_nbx_eager_data_release()
{
modify_config("RNDV_THRESH", "inf");
modify_config("ZCOPY_THRESH", "inf");
m_data_ptr = NULL;
}

virtual ucs_status_t
am_data_handler(const void *header, size_t header_length, void *data,
size_t length, const ucp_am_recv_param_t *rx_param)
{
EXPECT_FALSE(m_am_received);

ucs_status_t status;

if (rx_param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA) {
m_data_ptr = data;
status = UCS_INPROGRESS;
} else {
m_data_ptr = NULL;
status = UCS_OK;
}

m_am_received = true;

return status;
}

void test_data_release(size_t size)
{
size_t hdr_size = ucs_min(max_am_hdr(), 8);
test_am_send_recv(size);
if (m_data_ptr != NULL) {
ucp_am_data_release(receiver().worker(), m_data_ptr);
}

test_am_send_recv(size, hdr_size);
if (m_data_ptr != NULL) {
ucp_am_data_release(receiver().worker(), m_data_ptr);
}
}

size_t fragment_size()
{
return ucp_ep_config(sender().ep())->am.max_bcopy -
sizeof(ucp_am_hdr_t);
}

private:
void *m_data_ptr;
};

UCS_TEST_P(test_ucp_am_nbx_eager_data_release, short)
{
test_data_release(1);
}

UCS_TEST_P(test_ucp_am_nbx_eager_data_release, single)
{
test_data_release(fragment_size() / 2);
}

UCS_TEST_P(test_ucp_am_nbx_eager_data_release, multi)
{
test_data_release(fragment_size() * 2);
}

UCP_INSTANTIATE_TEST_CASE(test_ucp_am_nbx_eager_data_release)


class test_ucp_am_nbx_dts : public test_ucp_am_nbx {
public:
static const uint64_t dts_bitmap = UCS_BIT(UCP_DATATYPE_CONTIG) |
Expand Down

0 comments on commit 2822e7c

Please sign in to comment.