Skip to content

Commit

Permalink
UCP/CORE: Drop packets with invalid REQ or UCP EP IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Dec 9, 2020
1 parent 971aad1 commit f025184
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 63 deletions.
23 changes: 14 additions & 9 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,17 @@ static void ucp_am_rndv_send_ats(ucp_worker_h worker,
ucs_status_t status)
{
ucp_request_t *req;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, 1, return,
"AM RNDV ATS");
req = ucp_request_get(worker);
if (ucs_unlikely(req == NULL)) {
ucs_error("failed to allocate request for AM RNDV ATS");
return;
}

req->send.ep = ucp_worker_get_ep_by_id(worker, rts->super.sreq.ep_id);
req->send.ep = ep;
req->flags = 0;

ucp_rndv_req_send_ats(req, NULL, rts->super.sreq.req_id, status);
Expand Down Expand Up @@ -1176,8 +1179,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_handler_reply,
ucp_worker_h worker = (ucp_worker_h)am_arg;
ucp_ep_h reply_ep;

reply_ep = UCP_WORKER_GET_EP_BY_ID(worker, hdr->ep_id, "AM (reply proto)",
return UCS_OK);
reply_ep = UCP_WORKER_GET_EP_BY_ID(worker, hdr->ep_id, 1, return UCS_OK,
"AM (reply proto)");

return ucp_am_handler_common(worker, &hdr->super, sizeof(*hdr),
am_length, reply_ep, am_flags,
Expand Down Expand Up @@ -1234,6 +1237,7 @@ ucp_am_hdr_reply_ep(ucp_worker_h worker, uint16_t flags, ucp_ep_h ep,

*reply_ep_p = NULL;

out:
return 0ul;
}

Expand Down Expand Up @@ -1305,8 +1309,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler,
size_t remaining;
uint64_t recv_flags;

ep = UCP_WORKER_GET_EP_BY_ID(worker, first_hdr->super.ep_id,
"AM first fragment", return UCS_OK);
ep = UCP_WORKER_GET_EP_BY_ID(worker, first_hdr->super.ep_id, 1,
return UCS_OK, "AM first fragment");
remaining = first_hdr->total_size - (am_length - sizeof(*first_hdr));

if (ucs_unlikely(remaining == 0)) {
Expand Down Expand Up @@ -1378,8 +1382,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_middle_handler,
ucp_ep_h ep;
ucs_status_t status;

ep = UCP_WORKER_GET_EP_BY_ID(worker, mid_hdr->ep_id,
"AM middle fragment", return UCS_OK);
ep = UCP_WORKER_GET_EP_BY_ID(worker, mid_hdr->ep_id, 1,
return UCS_OK, "AM middle fragment");
ep_ext = ucp_ep_ext_proto(ep);
first_rdesc = ucp_am_find_first_rdesc(worker, ep_ext, msg_id);
if (first_rdesc != NULL) {
Expand Down Expand Up @@ -1421,10 +1425,11 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
ucs_status_t status, desc_status;
void *hdr;

ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, "AM RTS",
ep = UCP_WORKER_GET_EP_BY_ID(worker, rts->super.sreq.ep_id, 1,
{ status = UCS_ERR_ENDPOINT_TIMEOUT;
goto out_send_ats;
});
},
"AM RTS");

if (ucs_unlikely(!ucp_am_recv_check_id(worker, am_id))) {
status = UCS_ERR_INVALID_PARAM;
Expand Down
18 changes: 18 additions & 0 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -794,4 +794,22 @@ ucp_request_invoke_uct_completion(ucp_request_t *req, ucs_status_t status)
ucp_invoke_uct_completion(&req->send.state.uct_comp, status);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_recv_rndv(ucp_request_t *req, ucs_status_t status)
{
if (req->flags & UCP_REQUEST_FLAG_RECV_AM) {
ucp_request_complete_am_recv(req, status);
} else {
ucs_assert(req->flags & UCP_REQUEST_FLAG_RECV_TAG);
ucp_request_complete_tag_recv(req, status);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_recv_rndv_common(ucp_request_t *rreq, ucs_status_t status)
{
ucp_request_recv_buffer_dereg(rreq);
ucp_request_complete_recv_rndv(rreq, status);
}

#endif
18 changes: 8 additions & 10 deletions src/ucp/core/ucp_worker.inl
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ ucp_worker_get_request_id(ucp_worker_h worker, ucp_request_t *req, int indirect)
static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_worker_get_request_by_id(ucp_worker_h worker, ucs_ptr_map_key_t id)
{
ucp_request_t* request;

request = (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
ucs_assert(request != NULL);
return request;
return (ucp_request_t*)ucs_ptr_map_get(&worker->ptr_map, id);
}

static UCS_F_ALWAYS_INLINE void
Expand Down Expand Up @@ -247,14 +243,16 @@ ucp_worker_get_rkey_config(ucp_worker_h worker, const ucp_rkey_config_key_t *key
return ucp_worker_add_rkey_config(worker, key, cfg_index_p);
}

#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _str, _action) \
#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _check_closed_ep, \
_action, _fmt_str, ...) \
({ \
ucp_ep_h _ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \
if (ucs_unlikely((_ep == NULL) || \
((_ep)->flags & (UCP_EP_FLAG_CLOSED | \
UCP_EP_FLAG_FAILED)))) { \
ucs_trace_data("worker %p: drop %s on closed/failed ep %p", \
_worker, _str, _ep); \
(_check_closed_ep && \
((_ep)->flags & (UCP_EP_FLAG_CLOSED | \
UCP_EP_FLAG_FAILED))))) { \
ucs_diag("worker %p: ep id 0x%" PRIx64 " and closed/failed ep %p," \
" drop " _fmt_str, _worker, _ep_id, _ep, ##__VA_ARGS__); \
_action; \
} \
_ep; \
Expand Down
6 changes: 5 additions & 1 deletion src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,18 @@ static UCS_F_ALWAYS_INLINE ucp_request_t*
ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, ucs_ptr_map_key_t ep_id)
{
ucp_request_t *req;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, ep_id, 1, return NULL,
"ACK for sync-send");
req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate UCP request");
return NULL;
}

req->flags = 0;
req->send.ep = ucp_worker_get_ep_by_id(worker, ep_id);
req->send.ep = ep;
req->send.uct.func = ucp_proto_progress_am_single;
req->send.proto.comp_cb = ucp_request_put;
req->send.proto.status = UCS_OK;
Expand Down
5 changes: 3 additions & 2 deletions src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_req_handler, (arg, data, length, am_fl
{
ucp_atomic_req_hdr_t *atomicreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
atomicreqh->req.ep_id);
ucp_rsc_index_t amo_rsc_idx = ucs_ffs64_safe(worker->atomic_tls);
ucp_request_t *req;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, atomicreqh->req.ep_id, 1,
return UCS_OK, "SW AMO request");
if (ucs_unlikely((amo_rsc_idx != UCP_MAX_RESOURCES) &&
(ucp_worker_iface_get_attr(worker,
amo_rsc_idx)->cap.flags &
Expand Down
27 changes: 20 additions & 7 deletions src/ucp/rma/rma_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_put_handler, (arg, data, length, am_flags),
{
ucp_put_hdr_t *puth = data;
ucp_worker_h worker = arg;
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, puth->ep_id, 1, return UCS_OK,
"SW PUT request");
ucp_dt_contig_unpack(worker, (void*)puth->address, puth + 1,
length - sizeof(*puth), puth->mem_type);
ucp_rma_sw_send_cmpl(ucp_worker_get_ep_by_id(worker, puth->ep_id));
ucp_rma_sw_send_cmpl(ep);
return UCS_OK;
}

Expand All @@ -154,8 +157,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rma_cmpl_handler, (arg, data, length, am_flag
{
ucp_cmpl_hdr_t *putackh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker, putackh->ep_id);
ucp_ep_h ep;

ep = UCP_WORKER_GET_EP_BY_ID(worker, putackh->ep_id, 1, return UCS_OK,
"SW RMA completion");
ucp_ep_rma_remote_request_completed(ep);
return UCS_OK;
}
Expand Down Expand Up @@ -208,10 +213,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_req_handler, (arg, data, length, am_flags
{
ucp_get_req_hdr_t *getreqh = data;
ucp_worker_h worker = arg;
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker,
getreqh->req.ep_id);
ucp_ep_h ep;
ucp_request_t *req;

ep = UCP_WORKER_GET_EP_BY_ID(worker, getreqh->req.ep_id, 1, return UCS_OK,
"SW GET request");
req = ucp_request_get(worker);
if (req == NULL) {
ucs_error("failed to allocate get reply");
Expand Down Expand Up @@ -239,10 +245,17 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_get_rep_handler, (arg, data, length, am_flags
ucp_worker_h worker = arg;
ucp_rma_rep_hdr_t *getreph = data;
size_t frag_length = length - sizeof(*getreph);
ucp_request_t *req = ucp_worker_get_request_by_id(worker,
getreph->req_id);
ucp_ep_h ep = req->send.ep;
ucp_request_t *req;
ucp_ep_h ep;

req = ucp_worker_get_request_by_id(worker, getreph->req_id);
if (ucs_unlikely(req == NULL)) {
ucs_diag("unable to get request from GET reply data %p for non-existing"
" ep_id 0x%"PRIx64, getreph, getreph->req_id);
return UCS_OK;
}

ep = req->send.ep;
if (ep->worker->context->config.ext.proto_enable) {
// TODO use dt_iter.inl unpack
ucp_dt_contig_unpack(ep->worker,
Expand Down
52 changes: 43 additions & 9 deletions src/ucp/rndv/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_recv_frag_put_completion, (self),
/* rndv_req is NULL in case of put protocol */
if (!is_put_proto) {
rndv_req = ucp_worker_get_request_by_id(worker, rreq_remote_id);
if (ucs_unlikely(rndv_req == NULL)) {
ucs_diag("unable to get request from fragmented PUT request %p for"
" non-existing ep_id 0x%"PRIx64, freq, rreq_remote_id);
return;
}

/* pipeline recv get protocol */
rndv_req->send.state.dt.offset += freq->send.length;

Expand Down Expand Up @@ -1217,17 +1223,19 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_receive, (worker, rreq, rndv_rts_hdr, rkey_buf),

UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_receive", 0);

ep = UCP_WORKER_GET_EP_BY_ID(worker, rndv_rts_hdr->sreq.ep_id, 1, goto err,
"RNDV rts");

/* the internal send request allocated on receiver side (to perform a "get"
* operation, send "ATS" and "RTR") */
rndv_req = ucp_request_get(worker);
if (rndv_req == NULL) {
ucs_error("failed to allocate rendezvous reply");
goto out;
goto err;
}

rndv_req->send.ep = ucp_worker_get_ep_by_id(worker,
rndv_rts_hdr->sreq.ep_id);
rndv_req->flags = 0;
rndv_req->send.ep = ep;
rndv_req->send.mdesc = NULL;
rndv_req->send.pending_lane = UCP_NULL_LANE;
is_get_zcopy_failed = 0;
Expand All @@ -1248,7 +1256,6 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_receive, (worker, rreq, rndv_rts_hdr, rkey_buf),
}

/* if the receive side is not connected yet then the RTS was received on a stub ep */
ep = rndv_req->send.ep;
ep_config = ucp_ep_config(ep);
rndv_mode = worker->context->config.ext.rndv_mode;

Expand Down Expand Up @@ -1320,6 +1327,11 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_receive, (worker, rreq, rndv_rts_hdr, rkey_buf),

out:
UCS_ASYNC_UNBLOCK(&worker->async);
return;

err:
ucp_request_complete_recv_rndv_common(rreq, UCS_ERR_CONNECTION_RESET);
goto out;
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
Expand Down Expand Up @@ -1678,6 +1690,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler,
ucp_request_t *req = ucp_worker_get_request_by_id(arg,
rep_hdr->req_id);

if (ucs_unlikely(req == NULL)) {
ucs_diag("unable to get request from RNDV ATP %p for non-existing"
" ep_id 0x%"PRIx64, rep_hdr, rep_hdr->req_id);
return UCS_OK;
}

if (req->flags & UCP_REQUEST_FLAG_RNDV_FRAG) {
/* received ATP for frag RTR request */
ucs_assert(req->super_req != NULL);
Expand All @@ -1699,14 +1717,24 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler,
void *arg, void *data, size_t length, unsigned flags)
{
ucp_rndv_rtr_hdr_t *rndv_rtr_hdr = data;
ucp_request_t *sreq = ucp_worker_get_request_by_id(arg,
rndv_rtr_hdr->sreq_id);
ucp_ep_h ep = sreq->send.ep;
ucp_ep_config_t *ep_config = ucp_ep_config(ep);
ucp_context_h context = ep->worker->context;
ucp_request_t *sreq;
ucp_ep_h ep;
ucp_ep_config_t *ep_config;
ucp_context_h context;
ucs_status_t status;
int is_pipeline_rndv;

sreq = ucp_worker_get_request_by_id(arg, rndv_rtr_hdr->sreq_id);
if (ucs_unlikely(sreq == NULL)) {
ucs_diag("unable to get request from RNDV RTR %p for non-existing"
" ep_id 0x%"PRIx64, rndv_rtr_hdr, rndv_rtr_hdr->sreq_id);
return UCS_OK;
}

ep = sreq->send.ep;
ep_config = ucp_ep_config(ep);
context = ep->worker->context;

ucp_trace_req(sreq, "received rtr address 0x%"PRIx64" remote rreq_id"
"0x%"PRIx64, rndv_rtr_hdr->address, rndv_rtr_hdr->rreq_id);
UCS_PROFILE_REQUEST_EVENT(sreq, "rndv_rtr_recv", 0);
Expand Down Expand Up @@ -1815,6 +1843,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_handler,
size_t recv_len;

rreq = ucp_worker_get_request_by_id(worker, rndv_data_hdr->rreq_id);
if (ucs_unlikely(rreq == NULL)) {
ucs_diag("unable to get request from RNDV data %p for non-existing"
" ep_id 0x%"PRIx64, rndv_data_hdr, rndv_data_hdr->rreq_id);
return UCS_OK;
}

ucs_assert(!(rreq->flags & UCP_REQUEST_FLAG_RNDV_FRAG) &&
(rreq->flags & (UCP_REQUEST_FLAG_RECV_AM |
UCP_REQUEST_FLAG_RECV_TAG)));
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/stream/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ ucp_stream_am_handler(void *am_arg, void *am_data, size_t am_length,

ucs_assert(am_length >= sizeof(ucp_stream_am_hdr_t));

ep = ucp_worker_get_ep_by_id(worker, data->hdr.ep_id);
ep = UCP_WORKER_GET_EP_BY_ID(worker, data->hdr.ep_id, 1, return UCS_OK,
"stream data");
ep_ext = ucp_ep_ext_proto(ep);

if (ucs_unlikely(ep->flags & (UCP_EP_FLAG_CLOSED |
Expand Down
5 changes: 3 additions & 2 deletions src/ucp/tag/eager_snd.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,11 @@ void ucp_tag_eager_sync_send_ack(ucp_worker_h worker, void *hdr, uint16_t recv_f
ucs_assert(reqhdr->req_id != UCP_REQUEST_ID_INVALID);
req = ucp_proto_ssend_ack_request_alloc(worker, reqhdr->ep_id);
if (req == NULL) {
ucs_fatal("could not allocate request");
/* drop the packet */
return;
}

req->send.proto.am_id = UCP_AM_ID_EAGER_SYNC_ACK;
req->send.proto.am_id = UCP_AM_ID_EAGER_SYNC_ACK;
req->send.proto.remote_req_id = reqhdr->req_id;

ucs_trace_req("send_sync_ack req %p ep %p", req, req->send.ep);
Expand Down
Loading

0 comments on commit f025184

Please sign in to comment.