Skip to content

Commit

Permalink
UCP/TAG: Add debug info for recv completion and RTS cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe committed Sep 14, 2020
1 parent 4a46256 commit 11c6fbf
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 24 deletions.
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ static void ucp_ep_cleanup_unexp(ucp_ep_h ep)
rreq = matchq->exp_req;
if (rreq->recv.tag.ep_ptr == (uintptr_t)ep) {
ucs_debug("completing req %p", rreq);
ucp_request_complete_tag_recv(rreq, UCS_ERR_CANCELED);
ucp_request_complete_tag_recv(ep->worker, rreq, UCS_ERR_CANCELED,
"rndv_cancel");
kh_del(ucp_tag_frag_hash, &tm->frag_hash, iter);
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request),
removed = ucp_tag_exp_remove(&worker->tm, req);
/* If tag posted to the transport need to wait its completion */
if (removed && !(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED);
ucp_request_complete_tag_recv(worker, req, UCS_ERR_CANCELED,
"user_cancel");
}

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
Expand Down
12 changes: 11 additions & 1 deletion src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,23 @@ ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status)
ucp_request_complete_tag_recv(ucp_worker_h worker, ucp_request_t *req,
ucs_status_t status, const char *state)
{
ucs_trace_req("completing receive request %p (%p) "UCP_REQUEST_FLAGS_FMT
" stag 0x%"PRIx64" len %zu, %s",
req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
req->recv.tag.info.sender_tag, req->recv.tag.info.length,
ucs_status_string(status));

if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
ucp_tag_rndv_debug_entry_t *entry =
ucp_worker_rndv_debug_entry(worker, req->recv.req_id);
entry->send_tag = req->recv.tag.info.sender_tag;
entry->status = state;
entry->recvd_size = req->recv.tag.info.length;
}

UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info);
}
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ucp_eager_offload_handler(void *arg, void *data, size_t length,
ucp_eager_expected_handler(worker, req, data, length, recv_tag, flags);
req->recv.tag.info.length = length;
status = ucp_request_recv_data_unpack(req, data, length, 0, 1);
ucp_request_complete_tag_recv(req, status);
ucp_request_complete_tag_recv(worker, req, status, "exp_offload");
status = UCS_OK;
} else {
status = ucp_recv_desc_init(worker, data, length, sizeof(ucp_tag_t),
Expand Down Expand Up @@ -107,7 +107,7 @@ ucp_eager_tagged_handler(void *arg, void *data, size_t length, unsigned am_flags
status = ucp_request_recv_data_unpack(req,
UCS_PTR_BYTE_OFFSET(data, hdr_len),
recv_len, 0, 1);
ucp_request_complete_tag_recv(req, status);
ucp_request_complete_tag_recv(worker, req, status, "eager_only");
} else {
eagerf_hdr = data;
req->recv.tag.info.length =
Expand Down
5 changes: 3 additions & 2 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_completed,
UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, MATCHED);
out:
--req->recv.tag.wiface->post_count;
ucp_request_complete_tag_recv(req, status);
ucp_request_complete_tag_recv(req->recv.worker, req, status, "offload");
}

/* RNDV request matched by the transport. Need to proceed with SW based RNDV */
Expand All @@ -145,7 +145,8 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_rndv_cb,
--req->recv.tag.wiface->post_count;
if (ucs_unlikely(status != UCS_OK)) {
ucp_tag_offload_release_buf(req, 1);
ucp_request_complete_tag_recv(req, status);
ucp_request_complete_tag_recv(req->recv.worker, req, status,
"offload_rndv");
return;
}

Expand Down
52 changes: 39 additions & 13 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,19 @@ static void ucp_rndv_send_frag_atp(ucp_request_t *fsreq, uintptr_t remote_reques
ucp_request_send(fsreq, 0);
}

static void ucp_rndv_zcopy_recv_req_complete(ucp_request_t *req, ucs_status_t status)
static void
ucp_rndv_zcopy_recv_req_complete(ucp_worker_h worker, ucp_request_t *req,
ucs_status_t status)
{
ucp_request_recv_buffer_dereg(req);
ucp_request_complete_tag_recv(req, status);
ucp_request_complete_tag_recv(worker, req, status, "rndv_zcopy");
}

static void ucp_rndv_complete_rma_get_zcopy(ucp_request_t *rndv_req,
ucs_status_t status)
{
ucp_request_t *rreq = rndv_req->send.rndv_get.rreq;
ucp_worker_h worker = rndv_req->send.ep->worker;

ucs_assertv(rndv_req->send.state.dt.offset == rndv_req->send.length,
"rndv_req=%p offset=%zu length=%zu", rndv_req,
Expand All @@ -537,6 +540,12 @@ static void ucp_rndv_complete_rma_get_zcopy(ucp_request_t *rndv_req,
ucs_status_string(status));
UCS_PROFILE_REQUEST_EVENT(rreq, "complete_rndv_get", 0);

if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
ucp_tag_rndv_debug_entry_t *entry =
ucp_worker_rndv_debug_entry(worker, rndv_req->send.rndv_req_id);
entry->status = "rndv_completed";
}

ucp_rkey_destroy(rndv_req->send.rndv_get.rkey);
ucp_request_send_buffer_dereg(rndv_req);

Expand All @@ -548,7 +557,7 @@ static void ucp_rndv_complete_rma_get_zcopy(ucp_request_t *rndv_req,
ucp_request_put(rndv_req);
}

ucp_rndv_zcopy_recv_req_complete(rreq, status);
ucp_rndv_zcopy_recv_req_complete(worker, rreq, status);
}

static void ucp_rndv_recv_data_init(ucp_request_t *rreq, size_t size)
Expand Down Expand Up @@ -994,7 +1003,7 @@ static unsigned ucp_rndv_progress_rkey_ptr(void *arg)
ucp_rndv_rkey_ptr_rreq_advance(rreq, seg_size);
if (ucs_unlikely(status != UCS_OK) ||
(rreq->recv.state.offset == rndv_req->send.length)) {
ucp_request_complete_tag_recv(rreq, status);
ucp_request_complete_tag_recv(worker, rreq, status, "rkey_ptr");
ucp_rkey_destroy(rndv_req->send.rndv_get.rkey);
ucp_rndv_req_send_ats(rndv_req, rreq,
rndv_req->send.rndv_get.remote_request, status);
Expand Down Expand Up @@ -1053,7 +1062,7 @@ static void ucp_rndv_do_rkey_ptr(ucp_request_t *rndv_req, ucp_request_t *rreq,
&rkey->tl_rkey[rkey_index].rkey,
rndv_rts_hdr->address, &local_ptr);
if (status != UCS_OK) {
ucp_request_complete_tag_recv(rreq, status);
ucp_request_complete_tag_recv(worker, rreq, status, "rkey_ptr");
ucp_rkey_destroy(rkey);
ucp_rndv_req_send_ats(rndv_req, rreq, rndv_rts_hdr->sreq.reqptr, status);
return;
Expand Down Expand Up @@ -1109,7 +1118,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr, rts_seq),

ep = ucp_worker_get_ep_by_ptr(worker, rndv_rts_hdr->sreq.ep_ptr);
if (ep == NULL) {
ucp_request_complete_tag_recv(rreq, UCS_ERR_CANCELED);
ucp_request_complete_tag_recv(worker, rreq, UCS_ERR_CANCELED, "rndv_no_ep");
goto out;
}

Expand Down Expand Up @@ -1138,7 +1147,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr, rts_seq),
rndv_rts_hdr->size, rreq->recv.length, rreq);
ucp_rndv_req_send_ats(rndv_req, rreq, rndv_rts_hdr->sreq.reqptr, UCS_OK);
ucp_request_recv_generic_dt_finish(rreq);
ucp_rndv_zcopy_recv_req_complete(rreq, UCS_ERR_MESSAGE_TRUNCATED);
ucp_rndv_zcopy_recv_req_complete(worker, rreq, UCS_ERR_MESSAGE_TRUNCATED);
goto out;
}

Expand Down Expand Up @@ -1210,12 +1219,28 @@ static void ucp_rndv_send_cancel_ack(ucp_worker_h worker,
}

static void ucp_rndv_unexp_cancel(ucp_worker_h worker,
ucp_rndv_rts_hdr_t *rndv_rts_hdr)
ucp_rndv_rts_hdr_t *rndv_rts_hdr,
uint64_t rts_seq)
{
const ucp_rndv_rts_hdr_t *rdesc_rts_hdr;

ucp_tag_rndv_debug_entry_t *entry;
ucp_recv_desc_t *rdesc;
ucs_list_link_t *list;
uint64_t req_id;

req_id = worker->rndv_req_id++;

if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
entry = ucp_worker_rndv_debug_entry(worker, req_id);
entry->type = "rndv_cancel";
entry->rts_seq = rts_seq;
entry->send_tag = rndv_rts_hdr->super.tag;
entry->ep = ucp_worker_get_ep_by_ptr(worker,
rndv_rts_hdr->sreq.ep_ptr);
entry->remote_reqptr = rndv_rts_hdr->sreq.reqptr;
entry->remote_address = rndv_rts_hdr->address;
entry->size = rndv_rts_hdr->size;
}

list = ucp_tag_unexp_get_list_for_tag(&worker->tm, rndv_rts_hdr->super.tag);
ucs_list_for_each(rdesc, list, tag_list[UCP_RDESC_HASH_LIST]) {
Expand Down Expand Up @@ -1249,7 +1274,7 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,
seq = worker->rndv_rts_recv_seq++;

if (rndv_rts_hdr->status == UCS_ERR_CANCELED) {
ucp_rndv_unexp_cancel(worker, rndv_rts_hdr);
ucp_rndv_unexp_cancel(worker, rndv_rts_hdr, seq);
return UCS_OK;
}

Expand Down Expand Up @@ -1514,7 +1539,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_frag_recv_put_completion, (self, status),
ucp_request_put(freq);

if (req->recv.tag.remaining == 0) {
ucp_request_complete_tag_recv(req, UCS_OK);
ucp_request_complete_tag_recv(req->send.ep->worker, req, UCS_OK,
"freq");
}
}

Expand Down Expand Up @@ -1676,10 +1702,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler,
(arg, data, length, flags),
void *arg, void *data, size_t length, unsigned flags)
{
ucp_worker_h worker = arg;
ucp_reply_hdr_t *rep_hdr = data;
ucp_request_t *req;
ucp_request_t *rreq;
ucp_worker_h worker;
ucp_lane_index_t mem_type_rma_lane;
ucp_mem_desc_t *mdesc;
ucp_md_index_t md_index;
Expand Down Expand Up @@ -1731,7 +1757,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler,
ucp_request_send(req, 0);
} else {
UCS_PROFILE_REQUEST_EVENT(req, "rndv_atp_recv", 0);
ucp_rndv_zcopy_recv_req_complete(req, UCS_OK);
ucp_rndv_zcopy_recv_req_complete(worker, req, UCS_OK);
}

return UCS_OK;
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/tag/tag_match.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ KHASH_INIT(ucp_tag_frag_hash, uint64_t, ucp_tag_frag_match_t, 1,

typedef struct ucp_tag_rndv_debug_entry {
const char *type;
const char *status;
uint64_t id;
uint64_t rts_seq;
ucp_ep_h ep;
Expand All @@ -66,6 +67,7 @@ typedef struct ucp_tag_rndv_debug_entry {
uintptr_t remote_reqptr;
void *local_address;
size_t size;
size_t recvd_size;
ucp_request_t *rndv_get_req;
ucp_request_t *send_req;
ucp_request_t *recv_req;
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/tag/tag_match.inl
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ ucp_tag_request_process_recv_data(ucp_request_t *req, const void *data,
if (dereg) {
ucp_request_recv_buffer_dereg(req);
}
ucp_request_complete_tag_recv(req, status);
ucp_request_complete_tag_recv(req->recv.worker, req, status,
"tag_data_last");
ucs_assert(status != UCS_INPROGRESS);
return status;
} else {
Expand Down
15 changes: 12 additions & 3 deletions src/ucp/tag/tag_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@


static UCS_F_ALWAYS_INLINE void
ucp_tag_recv_request_completed(ucp_request_t *req, ucs_status_t status,
ucp_tag_recv_info_t *info, const char *function)
ucp_tag_recv_request_completed(ucp_worker_h worker, ucp_request_t *req,
ucs_status_t status, ucp_tag_recv_info_t *info,
const char *function)
{
ucs_trace_req("%s returning completed request %p (%p) stag 0x%"PRIx64" len %zu, %s",
function, req, req + 1, info->sender_tag, info->length,
ucs_status_string(status));

if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
ucp_tag_rndv_debug_entry_t *entry =
ucp_worker_rndv_debug_entry(worker, req->recv.req_id);
entry->send_tag = info->sender_tag;
entry->status = "recv_completed1";
entry->recvd_size = info->length;
}

req->status = status;
if ((req->flags |= UCP_REQUEST_FLAG_COMPLETED) & UCP_REQUEST_FLAG_RELEASED) {
ucp_request_put(req);
Expand Down Expand Up @@ -111,7 +120,7 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count,
if (req_flags & UCP_REQUEST_FLAG_CALLBACK) {
cb(req + 1, status, &req->recv.tag.info);
}
ucp_tag_recv_request_completed(req, status, &req->recv.tag.info,
ucp_tag_recv_request_completed(worker, req, status, &req->recv.tag.info,
debug_name);
return;
}
Expand Down

0 comments on commit 11c6fbf

Please sign in to comment.