-
Notifications
You must be signed in to change notification settings - Fork 423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UCP/AM: Drop AM data if rx ep is closed #5899
UCP/AM: Drop AM data if rx ep is closed #5899
Conversation
src/ucp/core/ucp_am.c
Outdated
} | ||
|
||
ep_ext = ucp_ep_ext_proto(ep); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line or don't need alignment by =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line or don't need the alignment by =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll remove the line
src/ucp/core/ucp_am.c
Outdated
return; | ||
} | ||
|
||
if (ucs_unlikely(!ucs_list_is_empty(&ep_ext->am.started_ams))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I'm not mistaken we use likely/unlikely only at fast path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_am.c
Outdated
@@ -1279,19 +1312,26 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler, | |||
size_t remaining; | |||
uint64_t recv_flags; | |||
|
|||
ep = ucp_worker_get_ep_by_id(worker, first_hdr->super.ep_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can move to declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_am.c
Outdated
} | ||
|
||
ep_ext = ucp_ep_ext_proto(ep); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line or don't need alignment by =
src/ucp/core/ucp_am.c
Outdated
void *hdr; | ||
|
||
ep = ucp_worker_get_ep_by_id(worker, rts->super.sreq.ep_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can move to declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
test/gtest/ucp/test_ucp_am.cc
Outdated
receiver().progress(); | ||
if (m_am_received) { | ||
request_wait(sreq); | ||
UCS_TEST_SKIP_R("received all AMs before ep close"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_am.c
Outdated
} | ||
|
||
ep_ext = ucp_ep_ext_proto(ep); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line or don't need the alignment by =
src/ucp/core/ucp_am.c
Outdated
void *hdr; | ||
|
||
ep = ucp_worker_get_ep_by_id(worker, rts->super.sreq.ep_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can initialize along with declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_am.c
Outdated
@@ -1279,19 +1312,26 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_long_first_handler, | |||
size_t remaining; | |||
uint64_t recv_flags; | |||
|
|||
ep = ucp_worker_get_ep_by_id(worker, first_hdr->super.ep_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can initialize along with declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_am.c
Outdated
@@ -62,17 +62,36 @@ void ucp_am_ep_init(ucp_ep_h ep) | |||
void ucp_am_ep_cleanup(ucp_ep_h ep) | |||
{ | |||
ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep); | |||
ucp_recv_desc_t *first_rdesc, *tmp, *mid_rdesc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can have only:
ucp_recv_desc_t *rdesc, *tmp_rdesc;
and use rdesc
and tmp_rdesc
for first/middle fragments processing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_am.c
Outdated
ucs_status_t status; | ||
|
||
if (ucs_unlikely(!ucp_am_ep_is_valid(ep))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, what if this is the last fragment received?
I guess, we could process it and report to a user, if it is not a REPLY proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo, this would make code more complicated (need to process the message before dropping) while does not provide valuable benefit
@yosefe, please review |
src/ucp/core/ucp_am.c
Outdated
return ((ep != NULL) && | ||
!(ep->flags & (UCP_EP_FLAG_CLOSED | UCP_EP_FLAG_FAILED))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe remove ep from ptr map before closed? avoid branch
@evgeny-leksikov
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean directly from ucp_ep_close_nb
and before request is completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC we don't have any EPs wire exchanges which can't be dropped after EP started closing, so should work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should I do it here, or it is a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here
test/gtest/ucp/test_ucp_am.cc
Outdated
{ | ||
test_ucp_am_nbx *self = reinterpret_cast<test_ucp_am_nbx*>(arg); | ||
self->m_am_received = true; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove space line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
test/gtest/ucp/test_ucp_am.cc
Outdated
} | ||
|
||
EXPECT_EQ(rx_expected, m_am_received); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
bot:pipe:retest |
src/ucp/core/ucp_am.c
Outdated
@@ -26,6 +26,18 @@ | |||
#define UCP_AM_SHORT_REPLY_MAX_SIZE (UCS_ALLOCA_MAX_SIZE - \ | |||
sizeof(ucs_ptr_map_key_t)) | |||
|
|||
#define UCP_WORKER_GET_EP_BY_ID(_worker, _ep_id, _str, _action) \ | |||
({ \ | |||
ucp_ep_h __ep = ucp_worker_get_ep_by_id(_worker, _ep_id); \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why __ep and not _ep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and i think ucp_worker_get_ep_by_id fails on assert if NULL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it is ucs_assertv((ep == NULL) || (ep->worker == worker)
src/ucp/core/ucp_am.c
Outdated
((__ep)->flags & (UCP_EP_FLAG_CLOSED | \ | ||
UCP_EP_FLAG_FAILED)))) { \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove from map during ep_close/fail
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
src/ucp/core/ucp_worker.inl
Outdated
{ | ||
ucp_ep_h ep = ucp_worker_get_ep_by_id(worker, id); | ||
|
||
if (ucs_likely(ep && !(ep->flags & |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ep != NULL
src/ucp/core/ucp_am.c
Outdated
ucs_trace_data("worker %p: drop AM %s on closed/failed ep %p", \ | ||
_worker, _str, __ep); \ | ||
_worker, _str, _ep); \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change log message to remote "AM"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please clarify?
@dmitrygx, do you have other comments? |
- If single fragment AM received without UCP_AM_SEND_REPLY flag it should be delivered to the user even if the corresponding rx ep is closed/failed. - If single fragment AM received with UCP_AM_SEND_REPLY flag it should be droppped if the corresponding rx ep is closed/failed, because reply ep can not be provided in the data callback. - If some fragment of multi-fragmented AM received and the corresponding rx ep is closed/failed, this fragment should be dropped regardless of send AM flags (no way to assemble a message without aux info storedd in ep extension) - If AM RTS is received and the corresponding rx ep is closed/failed, this RTS should be droppped and ATS with EP_TIMEOUT should be sent back to the sender.
4ec26d1
to
ee224c5
Compare
bot:pipe:retest |
bot:pipe:retest |
issue is #5978 bot:pipe:retest |
bot:pipe:retest |
1 similar comment
bot:pipe:retest |
What
should be delivered to the user even if the corresponding rx ep is
closed/failed.
should be droppped if the corresponding rx ep is closed/failed,
because reply ep can not be provided in the data callback.
is closed/failed, this fragment should be dropped regardless of send AM flags
(no way to assemble a message without aux info storedd in ep extension)
this RTS should be droppped and ATS with EP_TIMEOUT should be sent
back to the sender.
Why ?
Proper error handling