Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/CORE: Drop packets with invalid REQ or UCP EP IDs #6001

Merged
merged 1 commit into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,17 @@ static void ucp_am_rndv_send_ats(ucp_worker_h worker,
ucs_status_t status)
{
ucp_request_t *req;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, return,
"AM RNDV ATS");
req = ucp_request_get(worker);
if (ucs_unlikely(req == NULL)) {
ucs_error("failed to allocate request for AM RNDV ATS");
return;
}

req->send.ep = ucp_worker_get_ep_by_id(worker, rts->super.sreq.ep_id);
req->send.ep = ep;
req->flags = 0;

ucp_rndv_req_send_ats(req, NULL, rts->super.sreq.req_id, status);
Expand Down Expand Up @@ -1227,8 +1230,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler_reply,
ucp_worker_h worker = (ucp_worker_h)am_arg;
ucp_ep_h reply_ep;

reply_ep = UCP_WORKER_GET_EP_BY_ID(worker, hdr->ep_id, "AM (reply proto)",
return UCS_OK);
reply_ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, hdr->ep_id, return UCS_OK,
"AM (reply proto)");

return ucp_am_handler_common(worker, &hdr->super, sizeof(*hdr),
am_length, reply_ep, am_flags,
Expand Down Expand Up @@ -1356,8 +1359,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler,
size_t remaining;
uint64_t recv_flags;

ep = UCP_WORKER_GET_EP_BY_ID(worker, first_hdr->super.ep_id,
"AM first fragment", return UCS_OK);
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, first_hdr->super.ep_id,
return UCS_OK, "AM first fragment");
remaining = first_hdr->total_size - (am_length - sizeof(*first_hdr));

if (ucs_unlikely(remaining == 0)) {
Expand Down Expand Up @@ -1429,8 +1432,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_middle_handler,
ucp_ep_h ep;
ucs_status_t status;

ep = UCP_WORKER_GET_EP_BY_ID(worker, mid_hdr->ep_id,
"AM middle fragment", return UCS_OK);
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, mid_hdr->ep_id,
return UCS_OK, "AM middle fragment");
ep_ext = ucp_ep_ext_proto(ep);
first_rdesc = ucp_am_find_first_rdesc(worker, ep_ext, msg_id);
if (first_rdesc != NULL) {
Expand Down Expand Up @@ -1472,10 +1475,11 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
ucs_status_t status, desc_status;
void *hdr;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, "AM RTS",
{ status = UCS_ERR_ENDPOINT_TIMEOUT;
goto out_send_ats;
});
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, rts->super.sreq.ep_id,
{ status = UCS_ERR_CANCELED;
goto out_send_ats;
},
"AM RTS");

if (ucs_unlikely(!ucp_am_recv_check_id(worker, am_id))) {
status = UCS_ERR_INVALID_PARAM;
Expand Down
19 changes: 14 additions & 5 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,22 +211,31 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,

static void ucp_ep_delete(ucp_ep_h ep)
{
ucs_status_t status;

ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_wireup_msg_ack_cb_pred, ep);
if (!(ep->flags & UCP_EP_FLAG_INTERNAL)) {
ucp_worker_keepalive_remove_ep(ep);
}

ucs_list_del(&ucp_ep_ext_gen(ep)->ep_list);
if (!(ep->flags & UCP_EP_FLAG_FAILED)) {
ucp_ep_release_id(ep);
}

ucp_ep_destroy_base(ep);
}

void ucp_ep_release_id(ucp_ep_h ep)
{
ucs_status_t status;

ucs_assert(!(ep->flags & UCP_EP_FLAG_FAILED));

status = ucs_ptr_map_del(&ep->worker->ptr_map, ucp_ep_local_id(ep));
if (status != UCS_OK) {
ucs_warn("ep %p local id 0x%"PRIxPTR": ucs_ptr_map_del failed with status %s",
ucs_warn("ep %p local id 0x%" PRIxPTR ": ucs_ptr_map_del failed: %s",
ep, ucp_ep_local_id(ep), ucs_status_string(status));
}

ucp_ep_destroy_base(ep);
}

void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,
const char *peer_name, const char *message,
ucp_ep_h *ep_p);

void ucp_ep_release_id(ucp_ep_h ep);

ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_wireup_ep_t **wireup_ep);

Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
goto out_ok;
}

ucp_ep_release_id(ucp_ep);
ucp_ep->flags |= UCP_EP_FLAG_FAILED;

if (ucp_ep_config(ucp_ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) {
Expand Down
46 changes: 33 additions & 13 deletions src/ucp/core/ucp_worker.inl
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ ucp_worker_get_request_id(ucp_worker_h worker, ucp_request_t *req, int indirect)
static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_worker_get_request_by_id(ucp_worker_h worker, ucs_ptr_map_key_t id)
{
ucp_request_t* request;

request = (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
ucs_assert(request != NULL);
return request;
return (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
}

static UCS_F_ALWAYS_INLINE void
Expand Down Expand Up @@ -262,17 +258,41 @@ ucp_worker_get_rkey_config(ucp_worker_h worker, const ucp_rkey_config_key_t *key
return ucp_worker_add_rkey_config(worker, key, cfg_index_p);
}

#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _str, _action) \
#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h __ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely(__ep == NULL)) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was not found, drop" \
_fmt_str, _worker, _ep_id, ##__VA_ARGS__); \
_action; \
} \
__ep; \
})

#define UCP_WORKER_GET_VALID_EP_BY_ID(_worker, _ep_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h ___ep = UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _action, \
_fmt_str, ##__VA_ARGS__); \
if (ucs_unlikely((___ep != NULL) && \
(___ep->flags & UCP_EP_FLAG_CLOSED))) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was already closed" \
" ep %p, drop " _fmt_str, _worker, _ep_id, ___ep, \
##__VA_ARGS__); \
_action; \
} \
___ep; \
})

#define UCP_WORKER_GET_REQ_BY_ID(_worker, _req_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h _ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely((_ep == NULL) || \
((_ep)->flags & (UCP_EP_FLAG_CLOSED | \
UCP_EP_FLAG_FAILED)))) { \
ucs_trace_data("worker %p: drop %s on closed/failed ep %p", \
_worker, _str, _ep); \
ucp_request_t *_req = ucp_worker_get_request_by_id(_worker, _req_id); \
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
if (ucs_unlikely(_req == NULL)) { \
ucs_trace_data("worker %p: req id 0x%" PRIx64 " doesn't exist" \
" drop " _fmt_str, _worker, _req_id, \
##__VA_ARGS__); \
_action; \
} \
_ep; \
_req; \
})

#endif
9 changes: 4 additions & 5 deletions src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -527,17 +527,16 @@ ucp_proto_is_inline(ucp_ep_h ep, const ucp_memtype_thresh_t *max_eager_short,
}

static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucs_ptr_map_key_t ep_id)
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucp_ep_h ep)
{
ucp_request_t *req;

req = ucp_request_get(worker);
ucp_request_t *req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate UCP request");
return NULL;
}

req->flags = 0;
req->send.ep = ucp_worker_get_ep_by_id(worker, ep_id);
req->send.ep = ep;
req->send.uct.func = ucp_proto_progress_am_single;
req->send.proto.comp_cb = ucp_request_put;
req->send.proto.status = UCS_OK;
Expand Down
8 changes: 6 additions & 2 deletions src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_req_handler, (arg, data, length, am_fl
{
ucp_atomic_req_hdr_t *atomicreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
atomicreqh->req.ep_id);
ucp_rsc_index_t amo_rsc_idx = ucs_ffs64_safe(worker->atomic_tls);
ucp_request_t *req;
ucp_ep_h ep;

/* allow getting closed EP to be used for sending a completion or AMO data to
* enable flush on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, atomicreqh->req.ep_id, return UCS_OK,
"SW AMO request");
if (ucs_unlikely((amo_rsc_idx != UCP_MAX_RESOURCES) &&
(ucp_worker_iface_get_attr(worker,
amo_rsc_idx)->cap.flags &
Expand Down
32 changes: 25 additions & 7 deletions src/ucp/rma/rma_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,16 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_put_handler, (arg, data, length, am_flags),
{
ucp_put_hdr_t *puth = data;
ucp_worker_h worker = arg;
ucp_ep_h ep;

/* allow getting closed EP to be used for sending a completion to enable flush
* on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, puth->ep_id, return UCS_OK,
"SW PUT request");
ucp_dt_contig_unpack(worker, (void*)puth->address, puth + 1,
length - sizeof(*puth), puth->mem_type);
ucp_rma_sw_send_cmpl(ucp_worker_get_ep_by_id(worker, puth->ep_id));
ucp_rma_sw_send_cmpl(ep);
return UCS_OK;
}

Expand All @@ -160,8 +166,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rma_cmpl_handler, (arg, data, length, am_flag
{
ucp_cmpl_hdr_t *putackh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker, putackh->ep_id);
ucp_ep_h ep;

/* allow getting closed EP to be used for handling a completion to enable flush
* on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, putackh->ep_id, return UCS_OK,
"SW RMA completion");
ucp_ep_rma_remote_request_completed(ep);
return UCS_OK;
}
Expand Down Expand Up @@ -214,10 +225,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_req_handler, (arg, data, length, am_flags
{
ucp_get_req_hdr_t *getreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
getreqh->req.ep_id);
ucp_ep_h ep;
ucp_request_t *req;

/* allow getting closed EP to be used for sending a GET operation data to enable
* flush on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, getreqh->req.ep_id, return UCS_OK,
"SW GET request");
req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate get reply");
Expand Down Expand Up @@ -246,10 +261,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_rep_handler, (arg, data, length, am_flags
ucp_worker_h worker = arg;
ucp_rma_rep_hdr_t *getreph = data;
size_t frag_length = length - sizeof(*getreph);
ucp_request_t *req = ucp_worker_get_request_by_id(worker,
getreph->req_id);
ucp_ep_h ep = req->send.ep;
ucp_request_t *req;
ucp_ep_h ep;

req = UCP_WORKER_GET_REQ_BY_ID(worker, getreph->req_id,
return UCS_OK,
"GET reply data %p", getreph);
ep = req->send.ep;
if (ep->worker->context->config.ext.proto_enable) {
// TODO use dt_iter.inl unpack
ucp_dt_contig_unpack(ep->worker,
Expand Down
Loading