Skip to content

Commit

Permalink
UCT/TCP: Implement flush of all outstanding operations
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Jul 27, 2021
1 parent 99748da commit 388e1a3
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 51 deletions.
19 changes: 12 additions & 7 deletions src/tools/perf/lib/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,16 @@ static void ucp_perf_test_free_mem(ucx_perf_context_t *perf)
perf->allocator->ucp_free(perf, perf->send_buffer, perf->ucp.send_memh);
}

static void ucp_perf_worker_progress(void *arg)
{
ucx_perf_context_t *perf = arg;
int i;

for (i = 0; i < perf->params.thread_count; ++i) {
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker);
}
}

static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf)
{
unsigned i, thread_count = perf->params.thread_count;
Expand All @@ -1174,7 +1184,7 @@ static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf)

if (UCS_PTR_IS_PTR(req)) {
do {
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker);
ucp_perf_worker_progress(perf);
status = ucp_request_check_status(req);
} while (status == UCS_INPROGRESS);

Expand Down Expand Up @@ -1576,12 +1586,7 @@ void uct_perf_barrier(ucx_perf_context_t *perf)

void ucp_perf_barrier(ucx_perf_context_t *perf)
{
rte_call(perf, barrier, (void(*)(void*))ucp_worker_progress,
#if _OPENMP
(void*)perf->ucp.tctx[omp_get_thread_num()].perf.ucp.worker);
#else
(void*)perf->ucp.tctx[0].perf.ucp.worker);
#endif
rte_call(perf, barrier, ucp_perf_worker_progress, perf);
}

static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf)
Expand Down
21 changes: 11 additions & 10 deletions src/uct/tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,18 @@ enum {
UCT_TCP_EP_FLAG_ZCOPY_TX = UCS_BIT(2),
/* PUT RX operation is in progress on a given EP. */
UCT_TCP_EP_FLAG_PUT_RX = UCS_BIT(3),
/* PUT TX operation is waiting for an ACK on a given EP. */
UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK = UCS_BIT(4),
/* PUT RX operation is waiting for resources to send an ACK
* for received PUT operations on a given EP. */
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK = UCS_BIT(5),
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK = UCS_BIT(4),
/* EP is on connection matching context. */
UCT_TCP_EP_FLAG_ON_MATCH_CTX = UCS_BIT(6),
UCT_TCP_EP_FLAG_ON_MATCH_CTX = UCS_BIT(5),
/* EP failed and a callback for handling error is scheduled. */
UCT_TCP_EP_FLAG_FAILED = UCS_BIT(7),
UCT_TCP_EP_FLAG_FAILED = UCS_BIT(6),
/* EP is created to utilize CONNECT_TO_EP connection establishment
* method. */
UCT_TCP_EP_FLAG_CONNECT_TO_EP = UCS_BIT(8),
UCT_TCP_EP_FLAG_CONNECT_TO_EP = UCS_BIT(7),
/* EP is on EP PTR map. */
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9)
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(8)
};


Expand Down Expand Up @@ -217,7 +215,7 @@ typedef enum uct_tcp_ep_am_id {
UCT_TCP_EP_PUT_REQ_AM_ID = UCT_AM_ID_MAX + 1,
/* AM ID reserved for TCP internal PUT ACK message */
UCT_TCP_EP_PUT_ACK_AM_ID = UCT_AM_ID_MAX + 2,
/* AM ID reserved for TCP internal PUT ACK message */
/* AM ID reserved for TCP internal keepalive message */
UCT_TCP_EP_KEEPALIVE_AM_ID = UCT_AM_ID_MAX + 3
} uct_tcp_ep_am_id_t;

Expand Down Expand Up @@ -258,8 +256,9 @@ typedef struct uct_tcp_ep_put_completion {
* TCP endpoint communication context
*/
typedef struct uct_tcp_ep_ctx {
uint32_t put_sn; /* Sequence number of last sent
* or received PUT operation */
uint32_t sn; /* Sequence number of last sent
* TX operation or received PUT
* operation */
void *buf; /* Partial send/recv data */
size_t length; /* How much data in the buffer */
size_t offset; /* How much data was sent (TX) or was
Expand Down Expand Up @@ -337,6 +336,8 @@ struct uct_tcp_ep {
* closed as soon as the EP is connected
* using the new fd */
uct_tcp_ep_cm_id_t cm_id; /* EP connection mananger ID */
uint32_t put_cnt; /* Number of PUT operations scheduled */
uint32_t last_acked_sn; /* Last acked operation sequence number */
uct_tcp_ep_ctx_t tx; /* TX resources */
uct_tcp_ep_ctx_t rx; /* RX resources */
struct sockaddr_in peer_addr; /* Remote iface addr */
Expand Down
77 changes: 47 additions & 30 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ static inline void uct_tcp_ep_ctx_rewind(uct_tcp_ep_ctx_t *ctx)

static inline void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
{
ctx->put_sn = UINT32_MAX;
ctx->buf = NULL;
ctx->sn = UINT32_MAX;
ctx->buf = NULL;
uct_tcp_ep_ctx_rewind(ctx);
}

Expand Down Expand Up @@ -244,6 +244,8 @@ static UCS_CLASS_INIT_FUNC(uct_tcp_ep_t, uct_tcp_iface_t *iface,
self->flags = 0;
self->conn_state = UCT_TCP_EP_CONN_STATE_CLOSED;
self->cm_id.conn_sn = UCT_TCP_CM_CONN_SN_MAX;
self->last_acked_sn = UINT32_MAX;
self->put_cnt = 0;

ucs_list_head_init(&self->list);
ucs_queue_head_init(&self->pending_q);
Expand Down Expand Up @@ -621,7 +623,6 @@ void uct_tcp_ep_replace_ep(uct_tcp_ep_t *to_ep, uct_tcp_ep_t *from_ep)

to_ep->flags |= from_ep->flags & (UCT_TCP_EP_FLAG_ZCOPY_TX |
UCT_TCP_EP_FLAG_PUT_RX |
UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK |
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK);

if (uct_tcp_ep_ctx_buf_need_progress(&to_ep->rx)) {
Expand Down Expand Up @@ -898,11 +899,8 @@ static inline void uct_tcp_ep_handle_put_ack(uct_tcp_ep_t *ep,
uct_tcp_iface_t);
uct_tcp_ep_put_completion_t *put_comp;

if (put_ack->sn == ep->tx.put_sn) {
/* Since there are no other PUT operations in-flight, can remove flag
* and decrement iface outstanding operations counter */
ucs_assert(ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK);
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK;
ucs_assert(ep->put_cnt != 0);
if (--ep->put_cnt == 0) {
uct_tcp_iface_outstanding_dec(iface);
}

Expand All @@ -912,6 +910,8 @@ static inline void uct_tcp_ep_handle_put_ack(uct_tcp_ep_t *ep,
uct_invoke_completion(put_comp->comp, UCS_OK);
ucs_mpool_put_inline(put_comp);
}

ep->last_acked_sn = put_ack->sn;
}

void uct_tcp_ep_pending_queue_dispatch(uct_tcp_ep_t *ep)
Expand Down Expand Up @@ -947,11 +947,11 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
}

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
/* if the EP is waiting for the acknowledgment of the started
if (ep->put_cnt > 0) {
/* If the EP is waiting for the acknowledgment of the started
* PUT operation, decrease iface::outstanding counter */
uct_tcp_iface_outstanding_dec(iface);
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK;
ep->put_cnt = 0;
}

uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset);
Expand Down Expand Up @@ -1298,7 +1298,7 @@ static inline void uct_tcp_ep_handle_put_req(uct_tcp_ep_t *ep,
UCS_PTR_BYTE_OFFSET(ep->rx.buf, ep->rx.offset),
copied_length);
ep->rx.offset += copied_length;
ep->rx.put_sn = put_req->sn;
ep->rx.sn = put_req->sn;

/* Remove the flag that indicates that EP is sending PUT RX ACK in order
* to not ack the uncompleted PUT RX operation for which PUT REQ is being
Expand Down Expand Up @@ -1452,6 +1452,14 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
*hdr = ep->tx.buf;
(*hdr)->am_id = am_id;

++ep->tx.sn;
if (ep->tx.sn == ep->last_acked_sn) {
/* If the TX sequence number is now the same as the last acked sequence
* number, ensure that they are different to request ACK through PUT in
* TCP ep flush operation */
--ep->last_acked_sn;
}

return UCS_OK;

err_no_res:
Expand Down Expand Up @@ -1681,11 +1689,11 @@ static void uct_tcp_ep_post_put_ack(uct_tcp_ep_t *ep)
}

/* Send PUT ACK to confirm completing PUT operations with
* the last received sequence number == ep::rx::put_sn */
* the last received sequence number == ep::rx::sn */
ucs_assertv(hdr != NULL, "ep=%p", ep);
hdr->length = sizeof(*put_ack);
put_ack = (uct_tcp_ep_put_ack_hdr_t*)(hdr + 1);
put_ack->sn = ep->rx.put_sn;
put_ack->sn = ep->rx.sn;

uct_tcp_ep_am_send(ep, hdr);

Expand Down Expand Up @@ -1956,7 +1964,7 @@ uct_tcp_ep_put_comp_add(uct_tcp_ep_t *ep, uct_completion_t *comp, int wait_sn)
return UCS_ERR_NO_MEMORY;
}

put_comp->wait_put_sn = ep->tx.put_sn;
put_comp->wait_put_sn = ep->tx.sn;
put_comp->comp = comp;
ucs_queue_push(&ep->put_comp_q, &put_comp->elem);

Expand Down Expand Up @@ -1992,23 +2000,21 @@ ucs_status_t uct_tcp_ep_put_zcopy(uct_ep_h uct_ep, const uct_iov_t *iov,
ctx->super.length = sizeof(put_req);
put_req.addr = remote_addr;
put_req.length = ep->tx.length;
put_req.sn = ep->tx.put_sn + 1;
put_req.sn = ep->tx.sn;

status = uct_tcp_ep_am_sendv(ep, 0, &ctx->super, UCT_TCP_EP_PUT_ZCOPY_MAX,
&put_req, ctx->iov, ctx->iov_cnt);
if (ucs_unlikely(status != UCS_OK)) {
return status;
}

ep->tx.put_sn++;

if (!(ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK)) {
/* Add UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK flag and increment iface
* outstanding operations counter in order to ensure returning
* UCS_INPROGRESS from flush functions and do progressing.
* UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK flag has to be removed upon PUT
* ACK message receiving if there are no other PUT operations in-flight */
ep->flags |= UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK;
ucs_assert(ep->put_cnt != UINT32_MAX);
if (ep->put_cnt++ == 0) {
/* Increment iface outstanding operations counter in order to ensure
* returning UCS_INPROGRESS from flush functions and do progressing.
* Number of iface outstanding operations has to be removed upon PUT
* ACK message receiving if there are no other PUT operations in-flight
*/
uct_tcp_iface_outstanding_inc(iface);
}

Expand Down Expand Up @@ -2067,17 +2073,28 @@ ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
}

status = uct_tcp_ep_check_tx_res(ep);
if (status == UCS_ERR_NO_RESOURCE) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
return UCS_ERR_NO_RESOURCE;
if (ucs_unlikely(status != UCS_OK)) {
if (status == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
}
return UCS_OK;
}

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
status = uct_tcp_ep_put_comp_add(ep, comp, ep->tx.put_sn);
if (ep->last_acked_sn != ep->tx.sn) {
/* Decrement the sequence number to not consider the flush operation
* for waiting ACK, the sequence number will be incremented in PUT
* Zcopy operation. PUT Zcopy sends PUT REQ message which triggers
* sending ACK message back. */
--ep->tx.sn;
status = uct_ep_put_zcopy(&ep->super.super, NULL, 0, 0, 0, NULL);
ucs_assert(status != UCS_ERR_NO_RESOURCE);

status = uct_tcp_ep_put_comp_add(ep, comp, ep->tx.sn);
if (status != UCS_OK) {
return status;
}

UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
return UCS_INPROGRESS;
}

Expand Down
2 changes: 1 addition & 1 deletion test/gtest/common/test_obj_size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ UCS_TEST_F(test_obj_size, size) {
EXPECTED_SIZE(uct_base_ep_t, 8);
EXPECTED_SIZE(uct_rkey_bundle_t, 24);
EXPECTED_SIZE(uct_self_ep_t, 8);
EXPECTED_SIZE(uct_tcp_ep_t, 160);
EXPECTED_SIZE(uct_tcp_ep_t, 168);
# if HAVE_TL_RC
EXPECTED_SIZE(uct_rc_ep_t, 64);
EXPECTED_SIZE(uct_rc_verbs_ep_t, 80);
Expand Down
10 changes: 7 additions & 3 deletions test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,9 @@ bool ucp_test::check_tls(const std::string& tls)
ucp_test_base::entity::entity(const ucp_test_param& test_param,
ucp_config_t* ucp_config,
const ucp_worker_params_t& worker_params,
const ucp_test_base *test_owner)
: m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0)
const ucp_test_base *test_owner) :
m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0),
m_test(test_owner)
{
const int thread_type = test_param.variant.thread_type;
ucp_params_t local_ctx_params = test_param.variant.ctx_params;
Expand Down Expand Up @@ -987,7 +988,10 @@ void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e)
ucs_status_t status;
ucp_tag_recv_info_t info;
do {
e->progress();
const ucp_test *test = dynamic_cast<const ucp_test*>(e->m_test);
ASSERT_TRUE(test != NULL);

test->progress();
status = ucp_request_test(req, &info);
} while (status == UCS_INPROGRESS);
EXPECT_EQ(UCS_OK, status);
Expand Down
1 change: 1 addition & 0 deletions test/gtest/ucp/ucp_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ucp_test_base : public ucs::test_base {
size_t m_rejected_cntr;
size_t m_accept_err_cntr;
ucs::handle<ucp_ep_params_t*> m_server_ep_params;
const ucp_test_base *m_test;

private:
static void empty_send_completion(void *r, ucs_status_t status);
Expand Down

0 comments on commit 388e1a3

Please sign in to comment.