Skip to content

Commit

Permalink
Merge pull request #7188 from dmitrygx/topic/uct/tcp_ep_flush_v1_11
Browse files Browse the repository at this point in the history
UCT/TCP: Implement flush of all outstanding operations [v1.11.x]
  • Loading branch information
yosefe authored Aug 5, 2021
2 parents 211dd75 + eb2f218 commit 2a26ca6
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 34 deletions.
43 changes: 33 additions & 10 deletions src/tools/perf/lib/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,16 @@ static void uct_perf_test_cleanup_endpoints(ucx_perf_context_t *perf)
free(perf->uct.peers);
}

static void ucp_perf_worker_progress(void *arg)
{
ucx_perf_context_t *perf = arg;
int i;

for (i = 0; i < perf->params.thread_count; ++i) {
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker);
}
}

static ucs_status_t ucp_perf_test_fill_params(ucx_perf_params_t *params,
ucp_params_t *ucp_params)
{
Expand Down Expand Up @@ -1153,8 +1163,10 @@ static void ucp_perf_test_free_mem(ucx_perf_context_t *perf)
static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf)
{
unsigned i, thread_count = perf->params.thread_count;
ucs_status_ptr_t *req;
ucs_status_t status;
unsigned num_in_prog = 0;
ucs_status_ptr_t **reqs = ucs_alloca(thread_count * sizeof(*reqs));
ucs_status_ptr_t *req;
ucs_status_t status;

for (i = 0; i < thread_count; ++i) {
if (perf->ucp.tctx[i].perf.ucp.rkey != NULL) {
Expand All @@ -1166,19 +1178,25 @@ static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf)
UCP_EP_CLOSE_MODE_FLUSH);

if (UCS_PTR_IS_PTR(req)) {
do {
ucp_worker_progress(perf->ucp.tctx[i].perf.ucp.worker);
status = ucp_request_check_status(req);
} while (status == UCS_INPROGRESS);

ucp_request_release(req);
reqs[num_in_prog++] = req;
} else if (UCS_PTR_STATUS(req) != UCS_OK) {
ucs_warn("failed to close ep %p on thread %d: %s\n",
perf->ucp.tctx[i].perf.ucp.ep, i,
ucs_status_string(UCS_PTR_STATUS(req)));
}
}
}

while (num_in_prog != 0) {
ucp_perf_worker_progress(perf);
for (i = 0; i < num_in_prog; ++i) {
status = ucp_request_check_status(reqs[i]);
if (status != UCS_INPROGRESS) {
ucp_request_release(reqs[i]);
reqs[i] = reqs[--num_in_prog];
}
}
}
}

static ucs_status_t ucp_perf_test_exchange_status(ucx_perf_context_t *perf,
Expand Down Expand Up @@ -1567,7 +1585,7 @@ void uct_perf_barrier(ucx_perf_context_t *perf)
(void*)perf->uct.worker);
}

void ucp_perf_barrier(ucx_perf_context_t *perf)
void ucp_perf_thread_barrier(ucx_perf_context_t *perf)
{
rte_call(perf, barrier, (void(*)(void*))ucp_worker_progress,
#if _OPENMP
Expand All @@ -1577,6 +1595,11 @@ void ucp_perf_barrier(ucx_perf_context_t *perf)
#endif
}

void ucp_perf_barrier(ucx_perf_context_t *perf)
{
rte_call(perf, barrier, ucp_perf_worker_progress, perf);
}

static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf)
{
ucx_perf_params_t *params = &perf->params;
Expand Down Expand Up @@ -1815,7 +1838,7 @@ static struct {
[UCX_PERF_API_UCT] = {uct_perf_setup, uct_perf_cleanup,
uct_perf_test_dispatch, uct_perf_barrier},
[UCX_PERF_API_UCP] = {ucp_perf_setup, ucp_perf_cleanup,
ucp_perf_test_dispatch, ucp_perf_barrier}
ucp_perf_test_dispatch, ucp_perf_thread_barrier}
};

static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf,
Expand Down
1 change: 1 addition & 0 deletions src/tools/perf/lib/libperf_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ void ucx_perf_calc_result(ucx_perf_context_t *perf, ucx_perf_result_t *result);

void uct_perf_barrier(ucx_perf_context_t *perf);

void ucp_perf_thread_barrier(ucx_perf_context_t *perf);

void ucp_perf_barrier(ucx_perf_context_t *perf);

Expand Down
6 changes: 4 additions & 2 deletions src/uct/tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ enum {
* method. */
UCT_TCP_EP_FLAG_CONNECT_TO_EP = UCS_BIT(8),
/* EP is on EP PTR map. */
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9)
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9),
/* EP has some operations done without flush */
UCT_TCP_EP_FLAG_NEED_FLUSH = UCS_BIT(10)
};


Expand Down Expand Up @@ -217,7 +219,7 @@ typedef enum uct_tcp_ep_am_id {
UCT_TCP_EP_PUT_REQ_AM_ID = UCT_AM_ID_MAX + 1,
/* AM ID reserved for TCP internal PUT ACK message */
UCT_TCP_EP_PUT_ACK_AM_ID = UCT_AM_ID_MAX + 2,
/* AM ID reserved for TCP internal PUT ACK message */
/* AM ID reserved for TCP internal keepalive message */
UCT_TCP_EP_KEEPALIVE_AM_ID = UCT_AM_ID_MAX + 3
} uct_tcp_ep_am_id_t;

Expand Down
64 changes: 45 additions & 19 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,29 @@ static inline ucs_status_t uct_tcp_ep_check_tx_res(uct_tcp_ep_t *ep)
return UCS_ERR_NO_RESOURCE;
}

static inline ucs_status_t uct_tcp_ep_ctx_buf_alloc(uct_tcp_ep_t *ep,
uct_tcp_ep_ctx_t *ctx,
ucs_mpool_t *mpool)
{
ucs_assertv(ctx->buf == NULL, "tcp_ep=%p", ep);

ctx->buf = ucs_mpool_get_inline(mpool);
if (ucs_unlikely(ctx->buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from %p memory pool", ep,
mpool);
return UCS_ERR_NO_MEMORY;
}

return UCS_OK;
}

static inline void uct_tcp_ep_ctx_rewind(uct_tcp_ep_ctx_t *ctx)
{
ctx->offset = 0;
ctx->length = 0;
}

static inline void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
static void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
{
ctx->put_sn = UINT32_MAX;
ctx->buf = NULL;
Expand Down Expand Up @@ -622,7 +638,8 @@ void uct_tcp_ep_replace_ep(uct_tcp_ep_t *to_ep, uct_tcp_ep_t *from_ep)
to_ep->flags |= from_ep->flags & (UCT_TCP_EP_FLAG_ZCOPY_TX |
UCT_TCP_EP_FLAG_PUT_RX |
UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK |
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK);
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK |
UCT_TCP_EP_FLAG_NEED_FLUSH);

if (uct_tcp_ep_ctx_buf_need_progress(&to_ep->rx)) {
/* If some data was already read, we have to process it */
Expand Down Expand Up @@ -1331,10 +1348,8 @@ static unsigned uct_tcp_ep_progress_am_rx(uct_tcp_ep_t *ep)
ucs_trace_func("ep=%p", ep);

if (!uct_tcp_ep_ctx_buf_need_progress(&ep->rx)) {
ucs_assert(ep->rx.buf == NULL);
ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
if (ucs_unlikely(ep->rx.buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
if (ucs_unlikely(uct_tcp_ep_ctx_buf_alloc(
ep, &ep->rx, &iface->rx_mpool) != UCS_OK)) {
return 0;
}

Expand Down Expand Up @@ -1442,15 +1457,14 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
return status;
}

ucs_assertv(ep->tx.buf == NULL, "ep=%p", ep);

ep->tx.buf = ucs_mpool_get_inline(&iface->tx_mpool);
if (ucs_unlikely(ep->tx.buf == NULL)) {
status = uct_tcp_ep_ctx_buf_alloc(ep, &ep->tx, &iface->tx_mpool);
if (ucs_unlikely(status != UCS_OK)) {
goto err_no_res;
}

*hdr = ep->tx.buf;
(*hdr)->am_id = am_id;
ep->flags |= UCT_TCP_EP_FLAG_NEED_FLUSH;

return UCS_OK;

Expand Down Expand Up @@ -1503,9 +1517,8 @@ static unsigned uct_tcp_ep_progress_magic_number_rx(void *arg)
uint64_t magic_number;

if (ep->rx.buf == NULL) {
ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
if (ucs_unlikely(ep->rx.buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
if (ucs_unlikely(uct_tcp_ep_ctx_buf_alloc(
ep, &ep->rx, &iface->rx_mpool) != UCS_OK)) {
return 0;
}
}
Expand Down Expand Up @@ -1669,8 +1682,8 @@ static void uct_tcp_ep_post_put_ack(uct_tcp_ep_t *ep)
/* Make sure that we are sending nothing through this EP at the moment.
* This check is needed to avoid mixing AM/PUT data sent from this EP
* and this PUT ACK message */
status = uct_tcp_ep_am_prepare(iface, ep,
UCT_TCP_EP_PUT_ACK_AM_ID, &hdr);
status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_PUT_ACK_AM_ID,
&hdr);
if (status != UCS_OK) {
if (status == UCS_ERR_NO_RESOURCE) {
ep->flags |= UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK;
Expand Down Expand Up @@ -2067,9 +2080,20 @@ ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
}

status = uct_tcp_ep_check_tx_res(ep);
if (status == UCS_ERR_NO_RESOURCE) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
return UCS_ERR_NO_RESOURCE;
if (ucs_unlikely(status != UCS_OK)) {
return status;
}

if (ep->flags & UCT_TCP_EP_FLAG_NEED_FLUSH) {
status = uct_tcp_ep_put_zcopy(&ep->super.super, NULL, 0, 0, 0,
NULL);
ucs_assert(status != UCS_ERR_NO_RESOURCE);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
return status;
}

ep->flags &= ~UCT_TCP_EP_FLAG_NEED_FLUSH;
ucs_assert(ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK);
}

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
Expand All @@ -2078,6 +2102,7 @@ ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
return status;
}

UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
return UCS_INPROGRESS;
}

Expand All @@ -2095,7 +2120,8 @@ uct_tcp_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp)

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);

status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_KEEPALIVE_AM_ID, &hdr);
status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_KEEPALIVE_AM_ID,
&hdr);
if (status != UCS_OK) {
return (status == UCS_ERR_NO_RESOURCE) ? UCS_OK : status;
}
Expand Down
10 changes: 7 additions & 3 deletions test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ bool ucp_test::check_tls(const std::string& tls)
ucp_test_base::entity::entity(const ucp_test_param& test_param,
ucp_config_t* ucp_config,
const ucp_worker_params_t& worker_params,
const ucp_test_base *test_owner)
: m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0)
const ucp_test_base *test_owner) :
m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0),
m_test(test_owner)
{
const int thread_type = test_param.variant.thread_type;
ucp_params_t local_ctx_params = test_param.variant.ctx_params;
Expand Down Expand Up @@ -971,7 +972,10 @@ void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e)
ucs_status_t status;
ucp_tag_recv_info_t info;
do {
e->progress();
const ucp_test *test = dynamic_cast<const ucp_test*>(e->m_test);
ASSERT_TRUE(test != NULL);

test->progress();
status = ucp_request_test(req, &info);
} while (status == UCS_INPROGRESS);
EXPECT_EQ(UCS_OK, status);
Expand Down
1 change: 1 addition & 0 deletions test/gtest/ucp/ucp_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ucp_test_base : public ucs::test_base {
size_t m_rejected_cntr;
size_t m_accept_err_cntr;
ucs::handle<ucp_ep_params_t*> m_server_ep_params;
const ucp_test_base *m_test;

private:
static void empty_send_completion(void *r, ucs_status_t status);
Expand Down

0 comments on commit 2a26ca6

Please sign in to comment.