Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/TCP: Implement flush of all outstanding operations #7140

Merged
merged 1 commit into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions src/tools/perf/lib/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,16 @@ static void uct_perf_test_cleanup_endpoints(ucx_perf_context_t *perf)
free(perf->uct.peers);
}

static void ucp_perf_worker_progress(void *arg)
{
ucx_perf_context_t *perf = arg;
int i;

for (i = 0; i < perf->params.thread_count; ++i) {
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker);
}
}

static ucs_status_t ucp_perf_test_fill_params(ucx_perf_params_t *params,
ucp_params_t *ucp_params)
{
Expand Down Expand Up @@ -909,8 +919,10 @@ static ucs_status_t ucp_perf_test_fill_params(ucx_perf_params_t *params,
static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf)
{
unsigned i, thread_count = perf->params.thread_count;
ucs_status_ptr_t *req;
ucs_status_t status;
unsigned num_in_prog = 0;
ucs_status_ptr_t **reqs = ucs_alloca(thread_count * sizeof(*reqs));
ucs_status_ptr_t *req;
ucs_status_t status;

for (i = 0; i < thread_count; ++i) {
if (perf->ucp.tctx[i].perf.ucp.rkey != NULL) {
Expand All @@ -922,19 +934,25 @@ static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf)
UCP_EP_CLOSE_MODE_FLUSH);

if (UCS_PTR_IS_PTR(req)) {
do {
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker);
status = ucp_request_check_status(req);
} while (status == UCS_INPROGRESS);

ucp_request_release(req);
reqs[num_in_prog++] = req;
} else if (UCS_PTR_STATUS(req) != UCS_OK) {
ucs_warn("failed to close ep %p on thread %d: %s\n",
perf->ucp.tctx[i].perf.ucp.ep, i,
ucs_status_string(UCS_PTR_STATUS(req)));
}
}
}

while (num_in_prog != 0) {
ucp_perf_worker_progress(perf);
for (i = 0; i < num_in_prog; ++i) {
status = ucp_request_check_status(reqs[i]);
if (status != UCS_INPROGRESS) {
ucp_request_release(reqs[i]);
reqs[i] = reqs[--num_in_prog];
}
}
}
}

static ucs_status_t ucp_perf_test_exchange_status(ucx_perf_context_t *perf,
Expand Down Expand Up @@ -1323,7 +1341,7 @@ void uct_perf_barrier(ucx_perf_context_t *perf)
(void*)perf->uct.worker);
}

void ucp_perf_barrier(ucx_perf_context_t *perf)
void ucp_perf_thread_barrier(ucx_perf_context_t *perf)
{
rte_call(perf, barrier, (void(*)(void*))ucp_worker_progress,
#if _OPENMP
Expand All @@ -1333,6 +1351,11 @@ void ucp_perf_barrier(ucx_perf_context_t *perf)
#endif
}

void ucp_perf_barrier(ucx_perf_context_t *perf)
{
rte_call(perf, barrier, ucp_perf_worker_progress, perf);
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
}

static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf)
{
ucx_perf_params_t *params = &perf->params;
Expand Down Expand Up @@ -1566,7 +1589,7 @@ ucx_perf_funcs_t ucx_perf_funcs[] = {
[UCX_PERF_API_UCT] = {uct_perf_setup, uct_perf_cleanup,
uct_perf_test_dispatch, uct_perf_barrier},
[UCX_PERF_API_UCP] = {ucp_perf_setup, ucp_perf_cleanup,
ucp_perf_test_dispatch, ucp_perf_barrier}
ucp_perf_test_dispatch, ucp_perf_thread_barrier}
};

ucs_status_t ucx_perf_run(const ucx_perf_params_t *params,
Expand Down
1 change: 1 addition & 0 deletions src/tools/perf/lib/libperf_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ ucs_status_t uct_perf_test_dispatch(ucx_perf_context_t *perf);
ucs_status_t ucp_perf_test_dispatch(ucx_perf_context_t *perf);
void ucx_perf_calc_result(ucx_perf_context_t *perf, ucx_perf_result_t *result);
void uct_perf_barrier(ucx_perf_context_t *perf);
void ucp_perf_thread_barrier(ucx_perf_context_t *perf);
void ucp_perf_barrier(ucx_perf_context_t *perf);

ucs_status_t ucp_perf_test_alloc_mem(ucx_perf_context_t *perf);
Expand Down
6 changes: 4 additions & 2 deletions src/uct/tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ enum {
* method. */
UCT_TCP_EP_FLAG_CONNECT_TO_EP = UCS_BIT(8),
/* EP is on EP PTR map. */
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9)
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9),
/* EP has some operations done without flush */
UCT_TCP_EP_FLAG_NEED_FLUSH = UCS_BIT(10)
};


Expand Down Expand Up @@ -217,7 +219,7 @@ typedef enum uct_tcp_ep_am_id {
UCT_TCP_EP_PUT_REQ_AM_ID = UCT_AM_ID_MAX + 1,
/* AM ID reserved for TCP internal PUT ACK message */
UCT_TCP_EP_PUT_ACK_AM_ID = UCT_AM_ID_MAX + 2,
/* AM ID reserved for TCP internal PUT ACK message */
/* AM ID reserved for TCP internal keepalive message */
UCT_TCP_EP_KEEPALIVE_AM_ID = UCT_AM_ID_MAX + 3
} uct_tcp_ep_am_id_t;

Expand Down
64 changes: 45 additions & 19 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,29 @@ static inline ucs_status_t uct_tcp_ep_check_tx_res(uct_tcp_ep_t *ep)
return UCS_ERR_NO_RESOURCE;
}

static inline ucs_status_t uct_tcp_ep_ctx_buf_alloc(uct_tcp_ep_t *ep,
uct_tcp_ep_ctx_t *ctx,
ucs_mpool_t *mpool)
{
ucs_assertv(ctx->buf == NULL, "tcp_ep=%p", ep);

ctx->buf = ucs_mpool_get_inline(mpool);
if (ucs_unlikely(ctx->buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from %p memory pool", ep,
mpool);
return UCS_ERR_NO_MEMORY;
}

return UCS_OK;
}

static inline void uct_tcp_ep_ctx_rewind(uct_tcp_ep_ctx_t *ctx)
{
ctx->offset = 0;
ctx->length = 0;
}

static inline void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
static void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
{
ctx->put_sn = UINT32_MAX;
ctx->buf = NULL;
Expand Down Expand Up @@ -622,7 +638,8 @@ void uct_tcp_ep_replace_ep(uct_tcp_ep_t *to_ep, uct_tcp_ep_t *from_ep)
to_ep->flags |= from_ep->flags & (UCT_TCP_EP_FLAG_ZCOPY_TX |
UCT_TCP_EP_FLAG_PUT_RX |
UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK |
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK);
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK |
UCT_TCP_EP_FLAG_NEED_FLUSH);

if (uct_tcp_ep_ctx_buf_need_progress(&to_ep->rx)) {
/* If some data was already read, we have to process it */
Expand Down Expand Up @@ -1331,10 +1348,8 @@ static unsigned uct_tcp_ep_progress_am_rx(uct_tcp_ep_t *ep)
ucs_trace_func("ep=%p", ep);

if (!uct_tcp_ep_ctx_buf_need_progress(&ep->rx)) {
ucs_assert(ep->rx.buf == NULL);
ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
if (ucs_unlikely(ep->rx.buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
if (ucs_unlikely(uct_tcp_ep_ctx_buf_alloc(
ep, &ep->rx, &iface->rx_mpool) != UCS_OK)) {
return 0;
}

Expand Down Expand Up @@ -1442,15 +1457,14 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
return status;
}

ucs_assertv(ep->tx.buf == NULL, "ep=%p", ep);

ep->tx.buf = ucs_mpool_get_inline(&iface->tx_mpool);
if (ucs_unlikely(ep->tx.buf == NULL)) {
status = uct_tcp_ep_ctx_buf_alloc(ep, &ep->tx, &iface->tx_mpool);
if (ucs_unlikely(status != UCS_OK)) {
goto err_no_res;
}

*hdr = ep->tx.buf;
(*hdr)->am_id = am_id;
ep->flags |= UCT_TCP_EP_FLAG_NEED_FLUSH;

return UCS_OK;

Expand Down Expand Up @@ -1503,9 +1517,8 @@ static unsigned uct_tcp_ep_progress_magic_number_rx(void *arg)
uint64_t magic_number;

if (ep->rx.buf == NULL) {
ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
if (ucs_unlikely(ep->rx.buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
if (ucs_unlikely(uct_tcp_ep_ctx_buf_alloc(
ep, &ep->rx, &iface->rx_mpool) != UCS_OK)) {
return 0;
}
}
Expand Down Expand Up @@ -1669,8 +1682,8 @@ static void uct_tcp_ep_post_put_ack(uct_tcp_ep_t *ep)
/* Make sure that we are sending nothing through this EP at the moment.
* This check is needed to avoid mixing AM/PUT data sent from this EP
* and this PUT ACK message */
status = uct_tcp_ep_am_prepare(iface, ep,
UCT_TCP_EP_PUT_ACK_AM_ID, &hdr);
status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_PUT_ACK_AM_ID,
&hdr);
if (status != UCS_OK) {
if (status == UCS_ERR_NO_RESOURCE) {
ep->flags |= UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK;
Expand Down Expand Up @@ -2067,9 +2080,20 @@ ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
}

status = uct_tcp_ep_check_tx_res(ep);
if (status == UCS_ERR_NO_RESOURCE) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
brminich marked this conversation as resolved.
Show resolved Hide resolved
return UCS_ERR_NO_RESOURCE;
if (ucs_unlikely(status != UCS_OK)) {
return status;
}

if (ep->flags & UCT_TCP_EP_FLAG_NEED_FLUSH) {
status = uct_tcp_ep_put_zcopy(&ep->super.super, NULL, 0, 0, 0,
NULL);
ucs_assert(status != UCS_ERR_NO_RESOURCE);
brminich marked this conversation as resolved.
Show resolved Hide resolved
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
return status;
}

ep->flags &= ~UCT_TCP_EP_FLAG_NEED_FLUSH;
ucs_assert(ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK);
}

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
Expand All @@ -2078,6 +2102,7 @@ ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
return status;
}

UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
return UCS_INPROGRESS;
}

Expand All @@ -2095,7 +2120,8 @@ uct_tcp_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp)

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);

status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_KEEPALIVE_AM_ID, &hdr);
status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_KEEPALIVE_AM_ID,
&hdr);
if (status != UCS_OK) {
return (status == UCS_ERR_NO_RESOURCE) ? UCS_OK : status;
}
Expand Down
10 changes: 7 additions & 3 deletions test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,9 @@ bool ucp_test::check_tls(const std::string& tls)
ucp_test_base::entity::entity(const ucp_test_param& test_param,
ucp_config_t* ucp_config,
const ucp_worker_params_t& worker_params,
const ucp_test_base *test_owner)
: m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0)
const ucp_test_base *test_owner) :
m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0),
m_test(test_owner)
{
const int thread_type = test_param.variant.thread_type;
ucp_params_t local_ctx_params = test_param.variant.ctx_params;
Expand Down Expand Up @@ -987,7 +988,10 @@ void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e)
ucs_status_t status;
ucp_tag_recv_info_t info;
do {
e->progress();
const ucp_test *test = dynamic_cast<const ucp_test*>(e->m_test);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it to progress TCP flush on the remote side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it was the idea. otherwise, the test hangs.

ASSERT_TRUE(test != NULL);

test->progress();
status = ucp_request_test(req, &info);
} while (status == UCS_INPROGRESS);
EXPECT_EQ(UCS_OK, status);
Expand Down
1 change: 1 addition & 0 deletions test/gtest/ucp/ucp_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ucp_test_base : public ucs::test_base {
size_t m_rejected_cntr;
size_t m_accept_err_cntr;
ucs::handle<ucp_ep_params_t*> m_server_ep_params;
const ucp_test_base *m_test;

private:
static void empty_send_completion(void *r, ucs_status_t status);
Expand Down