Skip to content

Commit

Permalink
UCP/CORE: Drop packets with invalid REQ or UCP EP IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Jan 25, 2021
1 parent 567e3ed commit 49fbd8e
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 171 deletions.
26 changes: 15 additions & 11 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,17 @@ static void ucp_am_rndv_send_ats(ucp_worker_h worker,
ucs_status_t status)
{
ucp_request_t *req;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, return,
"AM RNDV ATS");
req = ucp_request_get(worker);
if (ucs_unlikely(req == NULL)) {
ucs_error("failed to allocate request for AM RNDV ATS");
return;
}

req->send.ep = ucp_worker_get_ep_by_id(worker, rts->super.sreq.ep_id);
req->send.ep = ep;
req->flags = 0;

ucp_rndv_req_send_ats(req, NULL, rts->super.sreq.req_id, status);
Expand Down Expand Up @@ -1170,8 +1173,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler_reply,
ucp_worker_h worker = (ucp_worker_h)am_arg;
ucp_ep_h reply_ep;

reply_ep = UCP_WORKER_GET_EP_BY_ID(worker, hdr->ep_id, "AM (reply proto)",
return UCS_OK);
reply_ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, hdr->ep_id, return UCS_OK,
"AM (reply proto)");

return ucp_am_handler_common(worker, &hdr->super, sizeof(*hdr),
am_length, reply_ep, am_flags,
Expand Down Expand Up @@ -1299,8 +1302,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler,
size_t remaining;
uint64_t recv_flags;

ep = UCP_WORKER_GET_EP_BY_ID(worker, first_hdr->super.ep_id,
"AM first fragment", return UCS_OK);
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, first_hdr->super.ep_id,
return UCS_OK, "AM first fragment");
remaining = first_hdr->total_size - (am_length - sizeof(*first_hdr));

if (ucs_unlikely(remaining == 0)) {
Expand Down Expand Up @@ -1372,8 +1375,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_middle_handler,
ucp_ep_h ep;
ucs_status_t status;

ep = UCP_WORKER_GET_EP_BY_ID(worker, mid_hdr->ep_id,
"AM middle fragment", return UCS_OK);
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, mid_hdr->ep_id,
return UCS_OK, "AM middle fragment");
ep_ext = ucp_ep_ext_proto(ep);
first_rdesc = ucp_am_find_first_rdesc(worker, ep_ext, msg_id);
if (first_rdesc != NULL) {
Expand Down Expand Up @@ -1415,10 +1418,11 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
ucs_status_t status, desc_status;
void *hdr;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, "AM RTS",
{ status = UCS_ERR_ENDPOINT_TIMEOUT;
goto out_send_ats;
});
ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, rts->super.sreq.ep_id,
{ status = UCS_ERR_CANCELED;
goto out_send_ats;
},
"AM RTS");

if (ucs_unlikely(!ucp_am_recv_check_id(worker, am_id))) {
status = UCS_ERR_INVALID_PARAM;
Expand Down
19 changes: 14 additions & 5 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,31 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,

void ucp_ep_delete(ucp_ep_h ep)
{
ucs_status_t status;

ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_wireup_msg_ack_cb_pred, ep);
if (!(ep->flags & UCP_EP_FLAG_INTERNAL)) {
ucp_worker_keepalive_remove_ep(ep);
}

ucs_list_del(&ucp_ep_ext_gen(ep)->ep_list);
if (!(ep->flags & UCP_EP_FLAG_FAILED)) {
ucp_ep_release_id(ep);
}

ucp_ep_destroy_base(ep);
}

void ucp_ep_release_id(ucp_ep_h ep)
{
ucs_status_t status;

ucs_assert(!(ep->flags & UCP_EP_FLAG_FAILED));

status = ucs_ptr_map_del(&ep->worker->ptr_map, ucp_ep_local_id(ep));
if (status != UCS_OK) {
ucs_warn("ep %p local id 0x%"PRIxPTR": ucs_ptr_map_del failed with status %s",
ucs_warn("ep %p local id 0x%" PRIxPTR ": ucs_ptr_map_del failed: %s",
ep, ucp_ep_local_id(ep), ucs_status_string(status));
}

ucp_ep_destroy_base(ep);
}

ucs_status_t
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,

void ucp_ep_delete(ucp_ep_h ep);

void ucp_ep_release_id(ucp_ep_h ep);

ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_wireup_ep_t **wireup_ep);

Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
goto out_ok;
}

ucp_ep_release_id(ucp_ep);
ucp_ep->flags |= UCP_EP_FLAG_FAILED;

if (ucp_ep_config(ucp_ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) {
Expand Down
46 changes: 33 additions & 13 deletions src/ucp/core/ucp_worker.inl
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ ucp_worker_get_request_id(ucp_worker_h worker, ucp_request_t *req, int indirect)
static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_worker_get_request_by_id(ucp_worker_h worker, ucs_ptr_map_key_t id)
{
ucp_request_t* request;

request = (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
ucs_assert(request != NULL);
return request;
return (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
}

static UCS_F_ALWAYS_INLINE void
Expand Down Expand Up @@ -265,17 +261,41 @@ ucp_worker_get_rkey_config(ucp_worker_h worker, const ucp_rkey_config_key_t *key
return ucp_worker_add_rkey_config(worker, key, cfg_index_p);
}

#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _str, _action) \
#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h __ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely(__ep == NULL)) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was not found, drop" \
_fmt_str, _worker, _ep_id, ##__VA_ARGS__); \
_action; \
} \
__ep; \
})

#define UCP_WORKER_GET_VALID_EP_BY_ID(_worker, _ep_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h ___ep = UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _action, \
_fmt_str, ##__VA_ARGS__); \
if (ucs_unlikely((___ep != NULL) && \
(___ep->flags & UCP_EP_FLAG_CLOSED))) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was already closed" \
" ep %p, drop " _fmt_str, _worker, _ep_id, ___ep, \
##__VA_ARGS__); \
_action; \
} \
___ep; \
})

#define UCP_WORKER_GET_REQ_BY_ID(_worker, _req_id, _action, _fmt_str, ...) \
({ \
ucp_ep_h _ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely((_ep == NULL) || \
((_ep)->flags & (UCP_EP_FLAG_CLOSED | \
UCP_EP_FLAG_FAILED)))) { \
ucs_trace_data("worker %p: drop %s on closed/failed ep %p", \
_worker, _str, _ep); \
ucp_request_t *_req = ucp_worker_get_request_by_id(_worker, _req_id); \
if (ucs_unlikely(_req == NULL)) { \
ucs_trace_data("worker %p: req id 0x%" PRIx64 " doesn't exist" \
" drop " _fmt_str, _worker, _req_id, \
##__VA_ARGS__); \
_action; \
} \
_ep; \
_req; \
})

#endif
9 changes: 4 additions & 5 deletions src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -518,17 +518,16 @@ ucp_proto_get_short_max(const ucp_request_t *req,
}

static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucs_ptr_map_key_t ep_id)
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucp_ep_h ep)
{
ucp_request_t *req;

req = ucp_request_get(worker);
ucp_request_t *req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate UCP request");
return NULL;
}

req->flags = 0;
req->send.ep = ucp_worker_get_ep_by_id(worker, ep_id);
req->send.ep = ep;
req->send.uct.func = ucp_proto_progress_am_single;
req->send.proto.comp_cb = ucp_request_put;
req->send.proto.status = UCS_OK;
Expand Down
8 changes: 6 additions & 2 deletions src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_req_handler, (arg, data, length, am_fl
{
ucp_atomic_req_hdr_t *atomicreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
atomicreqh->req.ep_id);
ucp_rsc_index_t amo_rsc_idx = ucs_ffs64_safe(worker->atomic_tls);
ucp_request_t *req;
ucp_ep_h ep;

/* allow getting closed EP to be used for sending a completion or AMO data to
* enable flush on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, atomicreqh->req.ep_id, return UCS_OK,
"SW AMO request");
if (ucs_unlikely((amo_rsc_idx != UCP_MAX_RESOURCES) &&
(ucp_worker_iface_get_attr(worker,
amo_rsc_idx)->cap.flags &
Expand Down
32 changes: 25 additions & 7 deletions src/ucp/rma/rma_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,16 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_put_handler, (arg, data, length, am_flags),
{
ucp_put_hdr_t *puth = data;
ucp_worker_h worker = arg;
ucp_ep_h ep;

/* allow getting closed EP to be used for sending a completion to enable flush
* on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, puth->ep_id, return UCS_OK,
"SW PUT request");
ucp_dt_contig_unpack(worker, (void*)puth->address, puth + 1,
length - sizeof(*puth), puth->mem_type);
ucp_rma_sw_send_cmpl(ucp_worker_get_ep_by_id(worker, puth->ep_id));
ucp_rma_sw_send_cmpl(ep);
return UCS_OK;
}

Expand All @@ -160,8 +166,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rma_cmpl_handler, (arg, data, length, am_flag
{
ucp_cmpl_hdr_t *putackh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker, putackh->ep_id);
ucp_ep_h ep;

/* allow getting closed EP to be used for handling a completion to enable flush
* on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, putackh->ep_id, return UCS_OK,
"SW RMA completion");
ucp_ep_rma_remote_request_completed(ep);
return UCS_OK;
}
Expand Down Expand Up @@ -214,10 +225,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_req_handler, (arg, data, length, am_flags
{
ucp_get_req_hdr_t *getreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
getreqh->req.ep_id);
ucp_ep_h ep;
ucp_request_t *req;

/* allow getting closed EP to be used for sending a GET operation data to enable
* flush on a peer
*/
ep = UCP_WORKER_GET_EP_BY_ID(worker, getreqh->req.ep_id, return UCS_OK,
"SW GET request");
req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate get reply");
Expand Down Expand Up @@ -246,10 +261,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_rep_handler, (arg, data, length, am_flags
ucp_worker_h worker = arg;
ucp_rma_rep_hdr_t *getreph = data;
size_t frag_length = length - sizeof(*getreph);
ucp_request_t *req = ucp_worker_get_request_by_id(worker,
getreph->req_id);
ucp_ep_h ep = req->send.ep;
ucp_request_t *req;
ucp_ep_h ep;

req = UCP_WORKER_GET_REQ_BY_ID(worker, getreph->req_id,
return UCS_OK,
"GET reply data %p", getreph);
ep = req->send.ep;
if (ep->worker->context->config.ext.proto_enable) {
// TODO use dt_iter.inl unpack
ucp_dt_contig_unpack(ep->worker,
Expand Down
Loading

0 comments on commit 49fbd8e

Please sign in to comment.