Skip to content

Commit

Permalink
UCT/TCP: Optimize condition to check TX resources
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Jul 29, 2021
1 parent 6e8238b commit 5a53785
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 21 deletions.
13 changes: 13 additions & 0 deletions src/uct/tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@
/* Maximal value for connection sequence number */
#define UCT_TCP_CM_CONN_SN_MAX UINT64_MAX

/* Endpoint can do TX operations if 3 conditions are true:
* - endpoint is connected to a peer
* - TX buffer is empty
* - number of PUT operations done is not equal to UINT32_MAX */
#define UCT_TCP_EP_TX_DO_MAX 3

/* Endpoint can do RX operations if 1 conditions is true:
* - RX buffer is empty */
#define UCT_TCP_EP_RX_DO_MAX 1

/* The seconds the connection needs to remain idle before TCP starts sending
* keepalive probes */
#define UCT_TCP_EP_DEFAULT_KEEPALIVE_IDLE 10
Expand Down Expand Up @@ -256,6 +266,9 @@ typedef struct uct_tcp_ep_put_completion {
* TCP endpoint communication context
*/
typedef struct uct_tcp_ep_ctx {
uint8_t unable_do_cnt; /* If the value is 0, it indicates
* the endpoint can perform TX or RX
* operations */
uint32_t sn; /* Sequence number of last sent
* TX operation or received PUT
* operation */
Expand Down
8 changes: 8 additions & 0 deletions src/uct/tcp/tcp_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ void uct_tcp_cm_change_conn_state(uct_tcp_ep_t *ep,
}
break;
case UCT_TCP_EP_CONN_STATE_CONNECTED:
ucs_assert(ep->tx.unable_do_cnt > 0);
--ep->tx.unable_do_cnt;

/* old_conn_state could be CONNECTING may happen when a peer is going
* to use this EP with socket from accepted connection in case of
* handling simultaneous connection establishment */
Expand Down Expand Up @@ -75,6 +78,11 @@ void uct_tcp_cm_change_conn_state(uct_tcp_ep_t *ep,
/* Since ep::peer_addr is 0'ed, we have to print w/o peer's address */
full_log = 0;
}

if (old_conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) {
ucs_assert(ep->tx.unable_do_cnt < UCT_TCP_EP_TX_DO_MAX);
++ep->tx.unable_do_cnt;
}
break;
case UCT_TCP_EP_CONN_STATE_ACCEPTING:
ucs_assert((old_conn_state == UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER) ||
Expand Down
84 changes: 63 additions & 21 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ static inline int uct_tcp_ep_ctx_buf_need_progress(uct_tcp_ep_ctx_t *ctx)

static inline ucs_status_t uct_tcp_ep_check_tx_res(uct_tcp_ep_t *ep)
{
if (ucs_likely((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) &&
if (ucs_likely(ep->tx.unable_do_cnt == 0)) {
ucs_assert((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) &&
uct_tcp_ep_ctx_buf_empty(&ep->tx) &&
(ep->put_cnt != UINT32_MAX))) {
(ep->put_cnt != UINT32_MAX));
return UCS_OK;
} else if (ucs_unlikely(ep->conn_state == UCT_TCP_EP_CONN_STATE_CLOSED)) {
return UCS_ERR_CONNECTION_RESET;
Expand All @@ -94,21 +95,57 @@ static inline ucs_status_t uct_tcp_ep_check_tx_res(uct_tcp_ep_t *ep)
return UCS_ERR_NO_RESOURCE;
}

static inline ucs_status_t uct_tcp_ep_ctx_buf_alloc(uct_tcp_ep_t *ep,
uint8_t ctx_type)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_ctx_t *ctx;
ucs_mpool_t *mpool;

if (ctx_type == UCT_TCP_EP_FLAG_CTX_TYPE_TX) {
ctx = &ep->tx;
mpool = &iface->tx_mpool;
ucs_assert(ctx->unable_do_cnt < UCT_TCP_EP_TX_DO_MAX);
} else {
ucs_assert(ctx_type == UCT_TCP_EP_FLAG_CTX_TYPE_RX);
ctx = &ep->rx;
mpool = &iface->rx_mpool;
ucs_assert(ctx->unable_do_cnt < UCT_TCP_EP_RX_DO_MAX);
}

ucs_assertv(ctx->buf == NULL, "tcp_ep=%p", ep);

ctx->buf = ucs_mpool_get_inline(mpool);
if (ucs_unlikely(ctx->buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from %p memory pool", ep,
mpool);
return UCS_ERR_NO_MEMORY;
}

++ctx->unable_do_cnt;
return UCS_OK;
}

static inline void uct_tcp_ep_ctx_rewind(uct_tcp_ep_ctx_t *ctx)
{
ctx->offset = 0;
ctx->length = 0;
}

static inline void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
static void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx, uint8_t unable_do_init)
{
ctx->sn = UINT32_MAX;
ctx->buf = NULL;
ctx->unable_do_cnt = unable_do_init;
ctx->sn = UINT32_MAX;
ctx->buf = NULL;
uct_tcp_ep_ctx_rewind(ctx);
}

static inline void uct_tcp_ep_ctx_reset(uct_tcp_ep_ctx_t *ctx)
{
ucs_assert(ctx->unable_do_cnt > 0);
--ctx->unable_do_cnt;

ucs_mpool_put_inline(ctx->buf);
ctx->buf = NULL;
uct_tcp_ep_ctx_rewind(ctx);
Expand Down Expand Up @@ -236,8 +273,8 @@ static UCS_CLASS_INIT_FUNC(uct_tcp_ep_t, uct_tcp_iface_t *iface,

uct_tcp_ep_addr_init(&self->peer_addr, dest_addr);

uct_tcp_ep_ctx_init(&self->tx);
uct_tcp_ep_ctx_init(&self->rx);
uct_tcp_ep_ctx_init(&self->tx, 1 /* not connected */);
uct_tcp_ep_ctx_init(&self->rx, 0);

self->events = 0;
self->conn_retries = 0;
Expand Down Expand Up @@ -902,6 +939,11 @@ static inline void uct_tcp_ep_handle_put_ack(uct_tcp_ep_t *ep,
uct_tcp_ep_put_completion_t *put_comp;

ucs_assert(ep->put_cnt != 0);
if (ucs_unlikely(ep->put_cnt == UINT32_MAX)) {
ucs_assert(ep->tx.unable_do_cnt > 0);
--ep->tx.unable_do_cnt;
}

if (--ep->put_cnt == 0) {
uct_tcp_iface_outstanding_dec(iface);
}
Expand Down Expand Up @@ -1333,10 +1375,8 @@ static unsigned uct_tcp_ep_progress_am_rx(uct_tcp_ep_t *ep)
ucs_trace_func("ep=%p", ep);

if (!uct_tcp_ep_ctx_buf_need_progress(&ep->rx)) {
ucs_assert(ep->rx.buf == NULL);
ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
if (ucs_unlikely(ep->rx.buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
if (ucs_unlikely(uct_tcp_ep_ctx_buf_alloc(
ep, UCT_TCP_EP_FLAG_CTX_TYPE_RX) != UCS_OK)) {
return 0;
}

Expand Down Expand Up @@ -1444,10 +1484,8 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
return status;
}

ucs_assertv(ep->tx.buf == NULL, "ep=%p", ep);

ep->tx.buf = ucs_mpool_get_inline(&iface->tx_mpool);
if (ucs_unlikely(ep->tx.buf == NULL)) {
status = uct_tcp_ep_ctx_buf_alloc(ep, UCT_TCP_EP_FLAG_CTX_TYPE_TX);
if (ucs_unlikely(status != UCS_OK)) {
goto err_no_res;
}

Expand Down Expand Up @@ -1504,18 +1542,17 @@ static unsigned uct_tcp_ep_progress_data_rx(void *arg)

static unsigned uct_tcp_ep_progress_magic_number_rx(void *arg)
{
uct_tcp_ep_t *ep = (uct_tcp_ep_t*)arg;
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_t *ep = (uct_tcp_ep_t*)arg;
uct_tcp_iface_t UCS_V_UNUSED *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
char str_local_addr[UCS_SOCKADDR_STRING_LEN];
char str_remote_addr[UCS_SOCKADDR_STRING_LEN];
size_t recv_length, prev_length;
uint64_t magic_number;

if (ep->rx.buf == NULL) {
ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
if (ucs_unlikely(ep->rx.buf == NULL)) {
ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
if (ucs_unlikely(uct_tcp_ep_ctx_buf_alloc(
ep, UCT_TCP_EP_FLAG_CTX_TYPE_RX) != UCS_OK)) {
return 0;
}
}
Expand Down Expand Up @@ -2018,6 +2055,11 @@ ucs_status_t uct_tcp_ep_put_zcopy(uct_ep_h uct_ep, const uct_iov_t *iov,
* ACK message receiving if there are no other PUT operations in-flight
*/
uct_tcp_iface_outstanding_inc(iface);

if (ucs_unlikely(ep->put_cnt == UINT32_MAX)) {
ucs_assert(ep->tx.unable_do_cnt > 0);
--ep->tx.unable_do_cnt;
}
}

UCT_TL_EP_STAT_OP(&ep->super, PUT, ZCOPY, put_req.length);
Expand Down

0 comments on commit 5a53785

Please sign in to comment.