Skip to content

Commit

Permalink
Merge pull request #7418 from hoopoepg/topic/rdesc-obj-str
Browse files Browse the repository at this point in the history
RECV/RDESC: added obj_str API implementation
  • Loading branch information
yosefe authored Sep 19, 2021
2 parents eb4cd26 + d21ec86 commit f98ff97
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 25 deletions.
19 changes: 12 additions & 7 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,8 @@ ucp_am_invoke_cb(ucp_worker_h worker, uint16_t am_id, void *user_hdr,

static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_handler_common(
ucp_worker_h worker, ucp_am_hdr_t *am_hdr, size_t total_length,
ucp_ep_h reply_ep, unsigned am_flags, uint64_t recv_flags)
ucp_ep_h reply_ep, unsigned am_flags, uint64_t recv_flags,
const char *name)
{
ucp_recv_desc_t *desc = NULL;
uint16_t am_id = am_hdr->am_id;
Expand Down Expand Up @@ -1228,7 +1229,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_handler_common(
desc_status = ucp_recv_desc_init(worker, data, data_length, 0, am_flags,
0, UCP_RECV_DESC_FLAG_AM_CB_INPROGRESS,
-(int)sizeof(*am_hdr),
worker->am.alignment, &desc);
worker->am.alignment, name, &desc);
if (ucs_unlikely(UCS_STATUS_IS_ERR(desc_status))) {
ucs_error("worker %p could not allocate descriptor for active"
" message on callback : %u",
Expand Down Expand Up @@ -1278,7 +1279,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler_reply,
"AM (reply proto)");

return ucp_am_handler_common(worker, hdr, am_length - sizeof(ftr), reply_ep,
am_flags, UCP_AM_RECV_ATTR_FIELD_REPLY_EP);
am_flags, UCP_AM_RECV_ATTR_FIELD_REPLY_EP,
"am_handler_reply");
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler,
Expand All @@ -1289,7 +1291,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler,
ucp_worker_h worker = am_arg;
ucp_am_hdr_t *hdr = am_data;

return ucp_am_handler_common(worker, hdr, am_length, NULL, am_flags, 0ul);
return ucp_am_handler_common(worker, hdr, am_length, NULL, am_flags, 0ul,
"am_handler");
}

static UCS_F_ALWAYS_INLINE ucp_recv_desc_t *
Expand Down Expand Up @@ -1432,7 +1435,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler,

return ucp_am_handler_common(worker, hdr,
am_length - sizeof(*first_ftr), ep,
am_flags, recv_flags);
am_flags, recv_flags,
"am_long_first_handler");
}

ep_ext = ucp_ep_ext_proto(ep);
Expand Down Expand Up @@ -1552,7 +1556,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_middle_handler,
* buffer is not allocated yet. When first fragment arrives (carrying total
* data size), all middle fragments will be copied to the data buffer. */
status = ucp_recv_desc_init(worker, am_data, am_length, 0, am_flags,
sizeof(*mid_hdr), 0, 0, 1, &mid_rdesc);
sizeof(*mid_hdr), 0, 0, 1,
"am_long_middle_handler", &mid_rdesc);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
ucs_error("worker %p could not allocate desc for assembling AM",
worker);
Expand Down Expand Up @@ -1607,7 +1612,7 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
desc_status = ucp_recv_desc_init(worker, data, length, 0, tl_flags, 0,
UCP_RECV_DESC_FLAG_RNDV |
UCP_RECV_DESC_FLAG_AM_CB_INPROGRESS, 0, 1,
&desc);
"am_rndv_process_rts", &desc);
if (ucs_unlikely(UCS_STATUS_IS_ERR(desc_status))) {
ucs_error("worker %p could not allocate descriptor for active"
" message RTS on callback %u", worker, am_id);
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ struct ucp_recv_desc {
AM memory pool or freeing it
in case of assembled
multi-fragment active message */
#if ENABLE_DEBUG_DATA
const char *name; /* Object name, debug only */
#endif
};


Expand Down
11 changes: 10 additions & 1 deletion src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,19 @@ ucp_request_recv_data_unpack(ucp_request_t *req, const void *data,
}
}

static UCS_F_ALWAYS_INLINE void
ucp_recv_desc_set_name(ucp_recv_desc_t *rdesc, const char *name)
{
#if ENABLE_DEBUG_DATA
rdesc->name = name;
#endif
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length,
int data_offset, unsigned am_flags, uint16_t hdr_len,
uint16_t rdesc_flags, int priv_length, size_t alignment,
ucp_recv_desc_t **rdesc_p)
const char *name, ucp_recv_desc_t **rdesc_p)
{
ucp_recv_desc_t *rdesc;
void *data_hdr;
Expand All @@ -698,6 +706,7 @@ ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length,
return UCS_ERR_NO_MEMORY;
}

ucp_recv_desc_set_name(rdesc, name);
padding = ucs_padding((uintptr_t)(rdesc + 1), worker->am.alignment);
rdesc = (ucp_recv_desc_t*)UCS_PTR_BYTE_OFFSET(rdesc, padding);
rdesc->release_desc_offset = padding;
Expand Down
19 changes: 18 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ static ucs_stats_class_t ucp_worker_stats_class = {
};
#endif

static void ucp_am_mpool_obj_str(ucs_mpool_t *mp, void *obj,
ucs_string_buffer_t *strb);

ucs_mpool_ops_t ucp_am_mpool_ops = {
.chunk_alloc = ucs_mpool_hugetlb_malloc,
.chunk_release = ucs_mpool_hugetlb_free,
.obj_init = ucs_empty_function,
.obj_cleanup = ucs_empty_function,
.obj_str = NULL
.obj_str = ucp_am_mpool_obj_str
};

ucs_mpool_ops_t ucp_reg_mpool_ops = {
Expand Down Expand Up @@ -3124,3 +3127,17 @@ void ucp_worker_vfs_refresh(void *obj)
}
UCS_ASYNC_UNBLOCK(&worker->async);
}

static void ucp_am_mpool_obj_str(ucs_mpool_t *mp, void *obj,
ucs_string_buffer_t *strb)
{
ucp_recv_desc_t *rdesc = obj;

ucs_string_buffer_appendf(strb, "flags:0x%x length:%d payload_offset:%d "
"release_offset:%d", rdesc->flags, rdesc->length,
rdesc->payload_offset,
rdesc->release_desc_offset);
#if ENABLE_DEBUG_DATA
ucs_string_buffer_appendf(strb, " name:%s", rdesc->name);
#endif
}
1 change: 1 addition & 0 deletions src/ucp/stream/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ ucp_stream_am_data_process(ucp_worker_t *worker, ucp_ep_ext_proto_t *ep_ext,
rdesc->payload_offset = sizeof(*rdesc) + sizeof(*am_data);
rdesc->flags = 0;
rdesc->release_desc_offset = 0;
ucp_recv_desc_set_name(rdesc, "stream_am_data_process");
memcpy(ucp_stream_rdesc_payload(rdesc),
UCS_PTR_BYTE_OFFSET(am_data, rdesc_tmp.payload_offset),
rdesc_tmp.length);
Expand Down
41 changes: 26 additions & 15 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ ucp_eager_expected_handler(ucp_worker_t *worker, ucp_request_t *req,

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_eager_offload_handler(void *arg, void *data, size_t length,
unsigned tl_flags, uint16_t flags, ucp_tag_t recv_tag)
unsigned tl_flags, uint16_t flags, ucp_tag_t recv_tag,
const char *name)
{
ucp_worker_t *worker = arg;
ucp_request_t *req;
Expand All @@ -58,7 +59,7 @@ ucp_eager_offload_handler(void *arg, void *data, size_t length,
} else {
status = ucp_recv_desc_init(worker, data, length, sizeof(ucp_tag_t),
tl_flags, sizeof(ucp_tag_t), flags,
sizeof(ucp_tag_t), 1, &rdesc);
sizeof(ucp_tag_t), 1, name, &rdesc);
if (!UCS_STATUS_IS_ERR(status)) {
rdesc_hdr = (ucp_tag_t*)(rdesc + 1);
*rdesc_hdr = recv_tag;
Expand All @@ -71,7 +72,8 @@ ucp_eager_offload_handler(void *arg, void *data, size_t length,

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_eager_tagged_handler(void *arg, void *data, size_t length, unsigned am_flags,
uint16_t flags, uint16_t hdr_len, uint16_t priv_length)
uint16_t flags, uint16_t hdr_len, uint16_t priv_length,
const char *name)
{
ucp_worker_h worker = arg;
ucp_eager_hdr_t *eager_hdr = data;
Expand Down Expand Up @@ -116,7 +118,7 @@ ucp_eager_tagged_handler(void *arg, void *data, size_t length, unsigned am_flags
status = UCS_OK;
} else {
status = ucp_recv_desc_init(worker, data, length, 0, am_flags, hdr_len,
flags, priv_length, 1, &rdesc);
flags, priv_length, 1, name, &rdesc);
if (!UCS_STATUS_IS_ERR(status)) {
ucp_tag_unexp_recv(&worker->tm, rdesc, recv_tag);
}
Expand All @@ -132,7 +134,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_only_handler,
return ucp_eager_tagged_handler(arg, data, length, am_flags,
UCP_RECV_DESC_FLAG_EAGER |
UCP_RECV_DESC_FLAG_EAGER_ONLY,
sizeof(ucp_eager_hdr_t), 0);
sizeof(ucp_eager_hdr_t), 0,
"eager_only_handler");
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_first_handler,
Expand All @@ -141,13 +144,15 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_first_handler,
{
return ucp_eager_tagged_handler(arg, data, length, am_flags,
UCP_RECV_DESC_FLAG_EAGER,
sizeof(ucp_eager_first_hdr_t), 0);
sizeof(ucp_eager_first_hdr_t), 0,
"eager_first_handler");
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_eager_common_middle_handler(ucp_worker_t *worker, void *data, size_t length,
uint16_t hdr_len, unsigned tl_flags,
uint16_t flags, uint16_t priv_length)
uint16_t flags, uint16_t priv_length,
const char *name)
{
ucp_eager_middle_hdr_t *hdr = data;
ucp_recv_desc_t *rdesc = NULL;
Expand All @@ -169,7 +174,8 @@ ucp_eager_common_middle_handler(ucp_worker_t *worker, void *data, size_t length,
if (ucp_tag_frag_match_is_unexp(matchq)) {
/* add new received descriptor to the queue */
status = ucp_recv_desc_init(worker, data, length, 0, tl_flags,
hdr_len, flags, priv_length, 1, &rdesc);
hdr_len, flags, priv_length, 1, name,
&rdesc);
if (ucs_likely(!UCS_STATUS_IS_ERR(status))) {
ucp_tag_frag_match_add_unexp(matchq, rdesc, hdr->offset);
} else if (ucs_queue_is_empty(&matchq->unexp_q)) {
Expand Down Expand Up @@ -219,7 +225,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_middle_handler,
return ucp_eager_common_middle_handler(arg, data, length,
sizeof(ucp_eager_middle_hdr_t),
am_flags, UCP_RECV_DESC_FLAG_EAGER,
0);
0, "eager_middle_handler");
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_sync_only_handler,
Expand All @@ -230,7 +236,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_sync_only_handler,
UCP_RECV_DESC_FLAG_EAGER|
UCP_RECV_DESC_FLAG_EAGER_ONLY|
UCP_RECV_DESC_FLAG_EAGER_SYNC,
sizeof(ucp_eager_sync_hdr_t), 0);
sizeof(ucp_eager_sync_hdr_t), 0,
"eager_sync_only_handler");
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_sync_first_handler,
Expand All @@ -240,7 +247,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_sync_first_handler,
return ucp_eager_tagged_handler(arg, data, length, am_flags,
UCP_RECV_DESC_FLAG_EAGER|
UCP_RECV_DESC_FLAG_EAGER_SYNC,
sizeof(ucp_eager_sync_first_hdr_t), 0);
sizeof(ucp_eager_sync_first_hdr_t), 0,
"eager_sync_first_handler");
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_eager_offload_sync_ack_handler,
Expand Down Expand Up @@ -327,7 +335,8 @@ ucp_tag_offload_eager_first_handler(ucp_worker_h worker, void *data,
priv->total_len = SIZE_MAX; /* length is not known at this point */
priv->msg_id = msg_ctx;
return ucp_eager_tagged_handler(worker, priv, length + priv_len,
tl_flags, flags, priv_len, priv_len);
tl_flags, flags, priv_len, priv_len,
"tag_offload_eager_first_handler");
}

static UCS_F_ALWAYS_INLINE ucs_status_t
Expand Down Expand Up @@ -368,7 +377,8 @@ ucp_tag_offload_eager_middle_handler(ucp_worker_h worker, void *data,
m_priv->msg_id = *(uint64_t*)context;

return ucp_eager_common_middle_handler(worker, tag_priv, length + priv_len,
priv_len, tl_flags, flags, priv_len);
priv_len, tl_flags, flags, priv_len,
"tag_offload_eager_middle_handler");
}

/* TODO: can handle multi-fragment messages in a more efficient way by saving
Expand Down Expand Up @@ -397,7 +407,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_eager,

return ucp_eager_offload_handler(wiface->worker, data, length, tl_flags,
flags | UCP_RECV_DESC_FLAG_EAGER_ONLY,
stag);
stag, "tag_offload_unexp_eager");
}

if (!(tl_flags & UCT_CB_PARAM_FLAG_FIRST)) {
Expand Down Expand Up @@ -430,7 +440,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_eager,
priv->req.ep_id = imm;
priv->super.super.tag = stag;
return ucp_eager_tagged_handler(worker, priv, length + priv_len,
tl_flags, flags, priv_len, priv_len);
tl_flags, flags, priv_len, priv_len,
"tag_offload_unexp_eager_sync");
}

static void ucp_eager_dump(ucp_worker_h worker, uct_am_trace_type_t type,
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/tag_rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ ucs_status_t ucp_tag_rndv_process_rts(ucp_worker_h worker,

status = ucp_recv_desc_init(worker, rts_hdr, length, 0, tl_flags,
sizeof(*rts_hdr), UCP_RECV_DESC_FLAG_RNDV, 0, 1,
&rdesc);
"tag_rndv_process_rts", &rdesc);
if (!UCS_STATUS_IS_ERR(status)) {
ucs_assert(ucp_rdesc_get_tag(rdesc) ==
ucp_tag_hdr_from_rts(rts_hdr)->tag);
Expand Down

0 comments on commit f98ff97

Please sign in to comment.