Skip to content

Commit

Permalink
UCP/RNDV: Init callbacks to NULL, add rndv-cancel debug
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe committed Sep 17, 2020
1 parent 609a9e7 commit 7693cde
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ ucp_request_put(ucp_request_t *req)
{
ucs_trace_req("put request %p", req);
UCS_PROFILE_REQUEST_FREE(req);
req->send.cb = NULL;
req->recv.tag.cb = NULL;
req->recv.stream.cb = NULL;
ucs_mpool_put_inline(req);
}

Expand Down
22 changes: 12 additions & 10 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static int ucp_rndv_is_recv_pipeline_needed(ucp_request_t *rndv_req,
return 1;
}

static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status,
const char *debug_status)
{
ucp_worker_h worker;
khiter_t iter;
Expand All @@ -62,7 +63,7 @@ static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
sreq, sreq->send.rndv_req_id, worker);
}

ucp_send_request_update_data(sreq, "rndv_done");
ucp_send_request_update_data(sreq, debug_status);
ucp_request_complete_send(sreq, status);
}

Expand Down Expand Up @@ -233,7 +234,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self),
return UCS_ERR_NO_RESOURCE;
} else {
ucs_assert(UCS_STATUS_IS_ERR(status));
ucp_rndv_complete_send(sreq, status);
ucp_rndv_complete_send(sreq, status, "rts_cancel");
return UCS_OK;
}
}
Expand Down Expand Up @@ -267,7 +268,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self),
} else if (packed_len == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
} else {
ucp_rndv_complete_send(sreq, (ucs_status_t)packed_len);
ucp_rndv_complete_send(sreq, (ucs_status_t)packed_len,
"progress_rndv_cancel");
return UCS_OK;
}
}
Expand Down Expand Up @@ -401,7 +403,7 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
void ucp_tag_rndv_cancel(ucp_request_t *sreq)
{
if (!(sreq->send.ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED)) {
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED);
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED, "rndv_cancel");
} else {
sreq->send.uct.func = ucp_proto_progress_rndv_cancel;
if (sreq->flags & UCP_REQUEST_FLAG_RNDV_RTS_SENT) {
Expand All @@ -417,7 +419,8 @@ void ucp_ep_complete_rndv_reqs(ucp_ep_h ep)

ucs_list_for_each_safe(sreq, tmp, &worker->rndv_reqs_list, send.list) {
if (sreq->send.ep == ep) {
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED);
ucp_rndv_complete_send(sreq, UCS_ERR_CANCELED,
"ep_closed_rndv_cancel");
}
}
}
Expand Down Expand Up @@ -1363,7 +1366,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,
if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_tag_offload_cancel_rndv(sreq);
}
ucp_rndv_complete_send(sreq, rep_hdr->status);
ucp_rndv_complete_send(sreq, rep_hdr->status, "ats_recv");
return UCS_OK;
}

Expand Down Expand Up @@ -1404,7 +1407,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_am_bcopy, (self),
ucp_rndv_pack_data, 1);
}
if (status == UCS_OK) {
ucp_rndv_complete_send(sreq, UCS_OK);
ucp_rndv_complete_send(sreq, UCS_OK, "rndv_am_bcopy_done");
} else if (status == UCP_STATUS_PENDING_SWITCH) {
status = UCS_OK;
}
Expand Down Expand Up @@ -1477,8 +1480,7 @@ static void ucp_rndv_am_zcopy_send_req_complete(ucp_request_t *req,
ucs_status_t status)
{
ucs_assert(req->send.state.uct_comp.count == 0);
ucp_request_send_buffer_dereg(req);
ucp_request_complete_send(req, status);
ucp_rndv_complete_send(req, status, "rndv_zcopy_complete");
}

static void ucp_rndv_am_zcopy_completion(uct_completion_t *self,
Expand Down
1 change: 1 addition & 0 deletions test/apps/iodemo/ucx_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ bool UcxConnection::process_request(const char *what,
ucx_request *r = reinterpret_cast<ucx_request*>(ptr_status);
if (r->completed) {
// already completed by callback
assert(ucp_request_is_completed(r));
status = r->status;
(*callback)(status);
UcxContext::request_release(r);
Expand Down

0 comments on commit 7693cde

Please sign in to comment.