-
Notifications
You must be signed in to change notification settings - Fork 423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UCT/TCP: Implement flush of all outstanding operations #7140
Conversation
92ef441
to
ef5509e
Compare
src/uct/tcp/tcp_ep.c
Outdated
* Zcopy operation. PUT Zcopy sends PUT REQ message which triggers | ||
* sending ACK message back. */ | ||
--ep->tx.sn; | ||
status = uct_ep_put_zcopy(&ep->super.super, NULL, 0, 0, 0, NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why shutdown doesn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown()
doesn’t work if user ha star following flow:
ucp_ep_close_nbx(FLUSH);
ucp_worker_destroy();
Since uct_ep_close()
shutdowns the connection and we should wait for it completion in epoll_wait()
. So, real closing of the socket is deferred. And we destroy socket, when destroying Worker. It will be ok, if user had some the following flow:
ucp_ep_close_nbx(FLUSH);
// wait for some time to ensure that socket is closed after sholutdown
ucp_worker_destroy();
13d2a90
to
388e1a3
Compare
src/tools/perf/lib/libperf.c
Outdated
@@ -1174,7 +1184,7 @@ static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf) | |||
|
|||
if (UCS_PTR_IS_PTR(req)) { | |||
do { | |||
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker); | |||
ucp_perf_worker_progress(perf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to progress all threads when doing a barrier or closing all EPs.
before the fix, only the first thread was progressed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did it actually cause a hang?
these threads are not communicating with one another so should not be a deadlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, one process moved to destroy the 2nd EP, while another process is flushing the 1st EP.
So, we should progress the 1st worker in the first process to send ACK message to the 1st EP of the second process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so can we progress all "thread_count" close reqs in parallel? like we do in OMPI for example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then each thread should take care of it closing. in current perftest implementation, master thread closes (one by one) all EPs and progress workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean still progress in master thread, but first start all close operations and then do the progress calls
@@ -987,7 +988,10 @@ void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e) | |||
ucs_status_t status; | |||
ucp_tag_recv_info_t info; | |||
do { | |||
e->progress(); | |||
const ucp_test *test = dynamic_cast<const ucp_test*>(e->m_test); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it to progress TCP flush on the remote side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it was the idea. otherwise, the test hangs.
src/uct/tcp/tcp_ep.c
Outdated
* UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK flag has to be removed upon PUT | ||
* ACK message receiving if there are no other PUT operations in-flight */ | ||
ep->flags |= UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK; | ||
ucs_assert(ep->put_cnt != UINT32_MAX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better return NO_RESOURCE
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? we could do several PUT operations simultaneously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean instead of having an assert you can return NO_RESOURCE
, until some puts confirmed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, done
src/uct/tcp/tcp_ep.c
Outdated
if (status == UCS_ERR_NO_RESOURCE) { | ||
return UCS_ERR_NO_RESOURCE; | ||
} | ||
return UCS_OK; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/uct/tcp/tcp_ep.c
Outdated
if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) { | ||
status = uct_tcp_ep_put_comp_add(ep, comp, ep->tx.put_sn); | ||
if (ep->last_acked_sn != ep->tx.sn) { | ||
/* Decrement the sequence number to not consider the flush operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it fixes hang of the following loop:
do {
uct_worker_progress(worker);
uct_ep_flush(ep);
} while (status = UCS_INPROGRESS);
since EP is always has outstanding PUT sent by flush. So, we don't want to count them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe insted call some internal function of tcp which will reuse the current sequence number?
it's confusing that sn is decremented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that it is confusing, but it should almost duplicate PUT code. Is it ok?
src/uct/tcp/tcp_ep.c
Outdated
@@ -1452,6 +1452,14 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, | |||
*hdr = ep->tx.buf; | |||
(*hdr)->am_id = am_id; | |||
|
|||
++ep->tx.sn; | |||
if (ep->tx.sn == ep->last_acked_sn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can it be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. if we don't request ACK for long period of time. so, it happens that ep->tx.sn == ep->last_acked_sn == 2^32 - 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean that ep->tx.sn
will wrap? Then why they will be necessarily 2^32 - 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the value can wrap.
No, it could be another value, just an example, if no PUT operations done at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an idea - will prepare a commit
@brminich your comments were address. could you review pls? |
src/uct/tcp/tcp_ep.c
Outdated
uct_tcp_ep_ctx_buf_empty(&ep->tx) & | ||
(ep->put_cnt != UINT32_MAX))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it will fail on assert below if ep->put_cnt == UINT32_MAX
now, because of ep->conn_state
also do we really need this check for all ops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we have to check it for all operations to avoid sending if something in the pending queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we assume puts will be unordered with other ops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure that we can, IB support ordering for WRITE after SEND.
also, we have to check put_cnt
inside uct_ep_pending_add()
, otherwise the following test will be failed:
for (i = 0; i < UINT32_MAX; ++i) {
status = uct_ep_put_zcopy();
ucs_assert(status == UCS_INPROGRESS);
}
status = uct_ep_put_zcopy();
ucs_assert(status == UCS_ERR_NO_RESOURCES);
status = uct_ep_pending_add();
ucs_assert(status == UCS_OK);
status = uct_ep_am_short();
ucs_assert(status == UCS_ERR_NO_RESOURCES); // Fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brminich is it ok?
src/uct/tcp/tcp_ep.c
Outdated
@@ -69,7 +69,8 @@ static inline int uct_tcp_ep_ctx_buf_need_progress(uct_tcp_ep_ctx_t *ctx) | |||
static inline ucs_status_t uct_tcp_ep_check_tx_res(uct_tcp_ep_t *ep) | |||
{ | |||
if (ucs_likely((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) && | |||
uct_tcp_ep_ctx_buf_empty(&ep->tx))) { | |||
uct_tcp_ep_ctx_buf_empty(&ep->tx) && | |||
(ep->put_cnt != UINT32_MAX))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you update ep->ctx
instead so that it will be "not empty" to avoid extra branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we increment ep->put_cnt
, ep->tx
isn't empty. and when we do progress of TX oeprations we don't really know if it is PUT or some AM operations.
so, we can't do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can have some flag which indicates "CAN_SEND" and set it when both are true:
- TX is empty
- put_cnt != UINT32_MAX
but the checks will be in other place (data path) to manage this flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/uct/tcp/tcp_ep.c
Outdated
if (ep->tx.sn == ep->last_acked_sn) { | ||
/* If the TX sequence number is now the same as the last acked sequence | ||
* number, ensure that they are different to request ACK through PUT in | ||
* TCP ep flush operation */ | ||
--ep->last_acked_sn; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do it for put operations only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, no
if ep->tx.sn
is wrapped and now it is equal to ep->last_acked_sn
, flush will return OK immediately, but we don't want it
@brminich is it ok now? |
src/tools/perf/lib/libperf.c
Outdated
@@ -1174,7 +1184,7 @@ static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf) | |||
|
|||
if (UCS_PTR_IS_PTR(req)) { | |||
do { | |||
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker); | |||
ucp_perf_worker_progress(perf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did it actually cause a hang?
these threads are not communicating with one another so should not be a deadlock
src/uct/tcp/tcp_ep.c
Outdated
ucs_assert(ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK); | ||
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK; | ||
ucs_assert(ep->put_cnt != 0); | ||
if (ucs_unlikely(ep->put_cnt == UINT32_MAX)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe make the sn 64 bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it makes sense
src/uct/tcp/tcp_ep.c
Outdated
if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) { | ||
status = uct_tcp_ep_put_comp_add(ep, comp, ep->tx.put_sn); | ||
if (ep->last_acked_sn != ep->tx.sn) { | ||
/* Decrement the sequence number to not consider the flush operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe insted call some internal function of tcp which will reuse the current sequence number?
it's confusing that sn is decremented
coverity error is relevant
|
src/tools/perf/lib/libperf.c
Outdated
@@ -1174,7 +1184,7 @@ static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf) | |||
|
|||
if (UCS_PTR_IS_PTR(req)) { | |||
do { | |||
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker); | |||
ucp_perf_worker_progress(perf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so can we progress all "thread_count" close reqs in parallel? like we do in OMPI for example
src/uct/tcp/tcp.h
Outdated
/* Endpoint can do TX operations if 3 conditions are true: | ||
* - endpoint is connected to a peer | ||
* - TX buffer is empty | ||
* - number of PUT operations done is not equal to UINT32_MAX */ | ||
#define UCT_TCP_EP_TX_DO_MAX 3 | ||
|
||
/* Endpoint can do RX operations if 1 conditions is true: | ||
* - RX buffer is empty */ | ||
#define UCT_TCP_EP_RX_DO_MAX 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/uct/tcp/tcp.h
Outdated
@@ -337,6 +346,8 @@ struct uct_tcp_ep { | |||
* closed as soon as the EP is connected | |||
* using the new fd */ | |||
uct_tcp_ep_cm_id_t cm_id; /* EP connection mananger ID */ | |||
uint32_t last_acked_sn; /* Last acked operation sequence number */ | |||
uint64_t put_cnt; /* Number of PUT operations scheduled */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on other handm making it 64but increases ep size. we can't really have 4G outstanding PUT operations, otheriwse sn would wrap around anyway, right?
so better return NO_RESOURCES if put_cnt >= INT32_MAX/2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I don't see any differences to have uint32_t put_cnt
and check put_cnt == UINT32_MAX
instead of
so better return NO_RESOURCES if put_cnt >= INT32_MAX/2
Also, it will require checking put_cnt
for other operations too. Then better to return back to what I suggested in 5a53785, i.e. having counter to simplify checking condition (it will be sing if
condition instead of 3 ones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to have 32-bit put_cnt
and move it along with last_acked_sn
under uct_tcp_ep_ctx_t
. So, they will be inside union and put_cnt
is valid for TX context, last_acked_sn
- for RX context.
src/uct/tcp/tcp_ep.c
Outdated
if (tx_sn_inc && (++ep->tx.sn == ep->last_acked_sn)) { | ||
/* If the TX sequence number is now the same as the last acked sequence | ||
* number, ensure that they are different to request ACK through PUT in | ||
* TCP ep flush operation */ | ||
--ep->last_acked_sn; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this PR could be done simpler (and w/o adding more counters):
- increase SN for flush operations as well
- keep flag on the ep of "whether there was put without flush":
- put turns the flag on
- flush is nop is flag is off, otherwise - put and turn the flag off
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, done
src/tools/perf/lib/libperf.c
Outdated
ucs_status_ptr_t **reqs; | ||
ucs_status_t status; | ||
|
||
reqs = ucs_malloc(thread_count * sizeof(*reqs), "ep_close_reqs"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use alloca
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/tools/perf/lib/libperf.c
Outdated
if (status != UCS_INPROGRESS) { | ||
--num_in_prog; | ||
ucp_request_release(reqs[i]); | ||
reqs[i] = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe remove req from the array (reqs[i]=reqs[--num-in_prog])
also, not add NULL reqs to array after close_nb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/uct/tcp/tcp.h
Outdated
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9) | ||
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(8), | ||
/* EP has some operations done without flush */ | ||
UCT_TCP_EP_FLAG_HAS_OPS_NO_FLUSH = UCS_BIT(9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UCT_TCP_EP_FLAG_NEED_FLUSH
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/uct/tcp/tcp_ep.c
Outdated
{ | ||
ctx->put_sn = UINT32_MAX; | ||
ctx->buf = NULL; | ||
ctx->sn = UINT32_MAX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why need to rename?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we increment this sn
not only for PUT operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use one flag (NEED_FLUSH) instead of counting am
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/uct/tcp/tcp_ep.c
Outdated
ucs_assert(ep->tx.put_cnt != 0); | ||
if (--ep->tx.put_cnt == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why needed to change this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since if some operation AM operations done after PUT operations are scheduled, ep->tx.sn
!= put_ack->sn
src/uct/tcp/tcp_ep.c
Outdated
} | ||
|
||
if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) { | ||
status = uct_tcp_ep_put_comp_add(ep, comp, ep->tx.put_sn); | ||
if (ep->rx.last_acked_sn != ep->tx.sn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove UCT_TCP_EP_FLAG_HAS_OPS_NO_FLUSH flag when receiving ACK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the following case won't work then:
do {
uct_worker_progress();
status = uct_ep_flush();
} while (status == UCS_INPROGRESS);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls fix the conflict by a merge commit
done |
e93811b
to
68fb8e3
Compare
@brminich could you review pls? |
What
Implement flush of all outstanding operations.
Why ?
To fix
uct_ep_flush()
which don't wait for all operations being completed. As a result if fixes the following case:How ?
last_acked_sn != tx.sn
in EPUCS_OK
if connection has already been closed.