Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/CORE/RNDV/GTEST: Drop packets with invalid ID and fix handling of status in RNDV RTS/RTR/data [v1.10.x] #6187

Merged
merged 2 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,17 @@ static void ucp_am_rndv_send_ats(ucp_worker_h worker,
ucs_status_t status)
{
ucp_request_t *req;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, return,
"AM RNDV ATS");
req = ucp_request_get(worker);
if (ucs_unlikely(req == NULL)) {
ucs_error("failed to allocate request for AM RNDV ATS");
return;
}

req->send.ep = ucp_worker_get_ep_by_id(worker, rts->super.sreq.ep_id);
req->send.ep = ep;
req->flags = 0;

ucp_rndv_req_send_ats(req, NULL, rts->super.sreq.req_id, status);
Expand Down Expand Up @@ -735,14 +738,16 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_am_rndv_rts, (self),
{
ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct);
size_t max_rts_size;
ucs_status_t status;

/* RTS consists of: AM RTS header, packed rkeys and user header */
max_rts_size = sizeof(ucp_am_rndv_rts_hdr_t) +
ucp_ep_config(sreq->send.ep)->rndv.rkey_size +
sreq->send.msg_proto.am.header_length;

return ucp_do_am_single(self, UCP_AM_ID_RNDV_RTS, ucp_am_rndv_rts_pack,
max_rts_size);
status = ucp_do_am_single(self, UCP_AM_ID_RNDV_RTS, ucp_am_rndv_rts_pack,
max_rts_size);
return ucp_rndv_rts_handle_status_from_pending(sreq, status);
}

static ucs_status_t ucp_am_send_start_rndv(ucp_request_t *sreq)
Expand Down Expand Up @@ -1170,8 +1175,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler_reply,
ucp_worker_h worker = (ucp_worker_h)am_arg;
ucp_ep_h reply_ep;

reply_ep = UCP_WORKER_GET_EP_BY_ID(worker, hdr->ep_id, "AM (reply proto)",
return UCS_OK);
reply_ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, hdr->ep_id, return UCS_OK,
"AM (reply proto)");

return ucp_am_handler_common(worker, &hdr->super, sizeof(*hdr),
am_length, reply_ep, am_flags,
Expand Down Expand Up @@ -1299,8 +1304,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler,
size_t remaining;
uint64_t recv_flags;

ep = UCP_WORKER_GET_EP_BY_ID(worker, first_hdr->super.ep_id,
"AM first fragment", return UCS_OK);
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, first_hdr->super.ep_id,
return UCS_OK, "AM first fragment");
remaining = first_hdr->total_size - (am_length - sizeof(*first_hdr));

if (ucs_unlikely(remaining == 0)) {
Expand Down Expand Up @@ -1372,8 +1377,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_middle_handler,
ucp_ep_h ep;
ucs_status_t status;

ep = UCP_WORKER_GET_EP_BY_ID(worker, mid_hdr->ep_id,
"AM middle fragment", return UCS_OK);
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, mid_hdr->ep_id,
return UCS_OK, "AM middle fragment");
ep_ext = ucp_ep_ext_proto(ep);
first_rdesc = ucp_am_find_first_rdesc(worker, ep_ext, msg_id);
if (first_rdesc != NULL) {
Expand Down Expand Up @@ -1415,10 +1420,11 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
ucs_status_t status, desc_status;
void *hdr;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, "AM RTS",
{ status = UCS_ERR_ENDPOINT_TIMEOUT;
goto out_send_ats;
});
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, rts->super.sreq.ep_id,
{ status = UCS_ERR_CANCELED;
goto out_send_ats;
},
"AM RTS");

if (ucs_unlikely(!ucp_am_recv_check_id(worker, am_id))) {
status = UCS_ERR_INVALID_PARAM;
Expand Down
19 changes: 14 additions & 5 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,31 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,

void ucp_ep_delete(ucp_ep_h ep)
{
ucs_status_t status;

ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_wireup_msg_ack_cb_pred, ep);
if (!(ep->flags & UCP_EP_FLAG_INTERNAL)) {
ucp_worker_keepalive_remove_ep(ep);
}

ucs_list_del(&ucp_ep_ext_gen(ep)->ep_list);
if (!(ep->flags & UCP_EP_FLAG_FAILED)) {
ucp_ep_release_id(ep);
}

ucp_ep_destroy_base(ep);
}

void ucp_ep_release_id(ucp_ep_h ep)
{
ucs_status_t status;

ucs_assert(!(ep->flags & UCP_EP_FLAG_FAILED));

status = ucs_ptr_map_del(&ep->worker->ptr_map, ucp_ep_local_id(ep));
if (status != UCS_OK) {
ucs_warn("ep %p local id 0x%"PRIxPTR": ucs_ptr_map_del failed with status %s",
ucs_warn("ep %p local id 0x%" PRIxPTR ": ucs_ptr_map_del failed: %s",
ep, ucp_ep_local_id(ep), ucs_status_string(status));
}

ucp_ep_destroy_base(ep);
}

ucs_status_t
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,

void ucp_ep_delete(ucp_ep_h ep);

void ucp_ep_release_id(ucp_ep_h ep);

ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_wireup_ep_t **wireup_ep);

Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
goto out_ok;
}

ucp_ep_release_id(ucp_ep);
ucp_ep->flags |= UCP_EP_FLAG_FAILED;

if (ucp_ep_config(ucp_ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) {
Expand Down
46 changes: 33 additions & 13 deletions src/ucp/core/ucp_worker.inl
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ ucp_worker_get_request_id(ucp_worker_h worker, ucp_request_t *req, int indirect)
static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_worker_get_request_by_id(ucp_worker_h worker, ucs_ptr_map_key_t id)
{
ucp_request_t* request;

request = (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
ucs_assert(request != NULL);
return request;
return (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
}

static UCS_F_ALWAYS_INLINE void
Expand Down Expand Up @@ -265,17 +261,41 @@ ucp_worker_get_rkey_config(ucp_worker_h worker, const ucp_rkey_config_key_t *key
return ucp_worker_add_rkey_config(worker, key, cfg_index_p);
}

#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _str, _action) \
#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h __ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely(__ep == NULL)) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was not found, drop" \
_fmt_str, _worker, _ep_id, ##__VA_ARGS__); \
_action; \
} \
__ep; \
})

#define UCP_WORKER_GET_VALID_EP_BY_ID(_worker, _ep_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h ___ep = UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _action, \
_fmt_str, ##__VA_ARGS__); \
if (ucs_unlikely((___ep != NULL) && \
(___ep->flags & UCP_EP_FLAG_CLOSED))) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was already closed" \
" ep %p, drop " _fmt_str, _worker, _ep_id, ___ep, \
##__VA_ARGS__); \
_action; \
} \
___ep; \
})

#define UCP_WORKER_GET_REQ_BY_ID(_worker, _req_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h _ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely((_ep == NULL) || \
((_ep)->flags & (UCP_EP_FLAG_CLOSED | \
UCP_EP_FLAG_FAILED)))) { \
ucs_trace_data("worker %p: drop %s on closed/failed ep %p", \
_worker, _str, _ep); \
ucp_request_t *_req = ucp_worker_get_request_by_id(_worker, _req_id); \
if (ucs_unlikely(_req == NULL)) { \
ucs_trace_data("worker %p: req id 0x%" PRIx64 " doesn't exist" \
" drop " _fmt_str, _worker, _req_id, \
##__VA_ARGS__); \
_action; \
} \
_ep; \
_req; \
})

#endif
16 changes: 10 additions & 6 deletions src/ucp/proto/proto_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ ucp_do_am_single(uct_pending_req_t *self, uint8_t am_id,
ucs_status_t ucp_proto_progress_am_single(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucs_status_t status = ucp_do_am_single(self, req->send.proto.am_id,
ucp_proto_pack,
ucp_proto_max_packed_size());
if (status == UCS_OK) {
req->send.proto.comp_cb(req);
ucs_status_t status;

status = ucp_do_am_single(self, req->send.proto.am_id, ucp_proto_pack,
ucp_proto_max_packed_size());
if (ucs_unlikely(status == UCS_ERR_NO_RESOURCE)) {
return UCS_ERR_NO_RESOURCE;
}
return status;

/* TODO: handle failure */
req->send.proto.comp_cb(req);
return UCS_OK;
}

void ucp_proto_am_zcopy_req_complete(ucp_request_t *req, ucs_status_t status)
Expand Down
43 changes: 23 additions & 20 deletions src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@

#define UCP_STATUS_PENDING_SWITCH (UCS_ERR_LAST - 1)

#define UCP_AM_BCOPY_HANDLE_STATUS(_multi, _status) \
do { \
if (_multi) { \
if (_status == UCS_INPROGRESS) { \
return UCS_INPROGRESS; \
} else if (ucs_unlikely(_status == UCP_STATUS_PENDING_SWITCH)) { \
return UCS_OK; \
} \
} else { \
ucs_assert(_status != UCS_INPROGRESS); \
} \
\
if (ucs_unlikely(_status == UCS_ERR_NO_RESOURCE)) { \
return UCS_ERR_NO_RESOURCE; \
} \
} while (0)


typedef void (*ucp_req_complete_func_t)(ucp_request_t *req, ucs_status_t status);


Expand Down Expand Up @@ -518,17 +536,16 @@ ucp_proto_get_short_max(const ucp_request_t *req,
}

static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucs_ptr_map_key_t ep_id)
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucp_ep_h ep)
{
ucp_request_t *req;

req = ucp_request_get(worker);
ucp_request_t *req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate UCP request");
return NULL;
}

req->flags = 0;
req->send.ep = ucp_worker_get_ep_by_id(worker, ep_id);
req->send.ep = ep;
req->send.uct.func = ucp_proto_progress_am_single;
req->send.proto.comp_cb = ucp_request_put;
req->send.proto.status = UCS_OK;
Expand All @@ -553,21 +570,7 @@ ucp_am_bcopy_handle_status_from_pending(uct_pending_req_t *self, int multi,
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);

if (multi) {
if (status == UCS_INPROGRESS) {
return UCS_INPROGRESS;
}

if (ucs_unlikely(status == UCP_STATUS_PENDING_SWITCH)) {
return UCS_OK;
}
} else {
ucs_assert(status != UCS_INPROGRESS);
}

if (ucs_unlikely(status == UCS_ERR_NO_RESOURCE)) {
return UCS_ERR_NO_RESOURCE;
}
UCP_AM_BCOPY_HANDLE_STATUS(multi, status);

ucp_request_send_generic_dt_finish(req);
if (tag_sync) {
Expand Down
8 changes: 6 additions & 2 deletions src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_req_handler, (arg, data, length, am_fl
{
ucp_atomic_req_hdr_t *atomicreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
atomicreqh->req.ep_id);
ucp_rsc_index_t amo_rsc_idx = ucs_ffs64_safe(worker->atomic_tls);
ucp_request_t *req;
ucp_ep_h ep;

/* allow getting closed EP to be used for sending a completion or AMO data to
* enable flush on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, atomicreqh->req.ep_id, return UCS_OK,
"SW AMO request");
if (ucs_unlikely((amo_rsc_idx != UCP_MAX_RESOURCES) &&
(ucp_worker_iface_get_attr(worker,
amo_rsc_idx)->cap.flags &
Expand Down
32 changes: 25 additions & 7 deletions src/ucp/rma/rma_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,16 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_put_handler, (arg, data, length, am_flags),
{
ucp_put_hdr_t *puth = data;
ucp_worker_h worker = arg;
ucp_ep_h ep;

/* allow getting closed EP to be used for sending a completion to enable flush
* on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, puth->ep_id, return UCS_OK,
"SW PUT request");
ucp_dt_contig_unpack(worker, (void*)puth->address, puth + 1,
length - sizeof(*puth), puth->mem_type);
ucp_rma_sw_send_cmpl(ucp_worker_get_ep_by_id(worker, puth->ep_id));
ucp_rma_sw_send_cmpl(ep);
return UCS_OK;
}

Expand All @@ -160,8 +166,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rma_cmpl_handler, (arg, data, length, am_flag
{
ucp_cmpl_hdr_t *putackh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker, putackh->ep_id);
ucp_ep_h ep;

/* allow getting closed EP to be used for handling a completion to enable flush
* on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, putackh->ep_id, return UCS_OK,
"SW RMA completion");
ucp_ep_rma_remote_request_completed(ep);
return UCS_OK;
}
Expand Down Expand Up @@ -214,10 +225,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_req_handler, (arg, data, length, am_flags
{
ucp_get_req_hdr_t *getreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
getreqh->req.ep_id);
ucp_ep_h ep;
ucp_request_t *req;

/* allow getting closed EP to be used for sending a GET operation data to enable
* flush on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, getreqh->req.ep_id, return UCS_OK,
"SW GET request");
req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate get reply");
Expand Down Expand Up @@ -246,10 +261,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_rep_handler, (arg, data, length, am_flags
ucp_worker_h worker = arg;
ucp_rma_rep_hdr_t *getreph = data;
size_t frag_length = length - sizeof(*getreph);
ucp_request_t *req = ucp_worker_get_request_by_id(worker,
getreph->req_id);
ucp_ep_h ep = req->send.ep;
ucp_request_t *req;
ucp_ep_h ep;

req = UCP_WORKER_GET_REQ_BY_ID(worker, getreph->req_id,
return UCS_OK,
"GET reply data %p", getreph);
ep = req->send.ep;
if (ep->worker->context->config.ext.proto_enable) {
// TODO use dt_iter.inl unpack
ucp_dt_contig_unpack(ep->worker,
Expand Down
Loading