Skip to content

Commit

Permalink
Merge pull request #4305 from dmitrygx/topic/v1.7.x/tcp_close_sock
Browse files Browse the repository at this point in the history
UCT/TCP: Close TCP socket when unable to send data
  • Loading branch information
yosefe authored Oct 19, 2019
2 parents 9544fb9 + 3f334fb commit 7bf663f
Showing 1 changed file with 92 additions and 49 deletions.
141 changes: 92 additions & 49 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,69 +472,92 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep,
{
ucs_debug("tcp_ep %p: remote disconnected", ep);

uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVREAD);
uct_tcp_ep_ctx_reset(ctx);

if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_RX);
uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVREAD);
} else {
/* If the EP supports RX only, destroy it */
uct_tcp_ep_destroy_internal(&ep->super.super);
}

uct_tcp_ep_mod_events(ep, 0, ep->events);
uct_tcp_ep_close_fd(&ep->fd);
} else if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
/* If the EP supports RX only, destroy it */
uct_tcp_ep_destroy_internal(&ep->super.super);
}
}

static inline unsigned uct_tcp_ep_send(uct_tcp_ep_t *ep, size_t *sent_length)
static inline ssize_t uct_tcp_ep_send(uct_tcp_ep_t *ep)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
size_t sent_length;
ucs_status_t status;

*sent_length = ep->tx.length - ep->tx.offset;
ucs_assert(*sent_length > 0);
ucs_assert(ep->tx.length > ep->tx.offset);
sent_length = ep->tx.length - ep->tx.offset;

status = ucs_socket_send_nb(ep->fd, ep->tx.buf + ep->tx.offset,
sent_length, NULL, NULL);
if (status != UCS_OK) {
return 0;
status = ucs_socket_send_nb(ep->fd, UCS_PTR_BYTE_OFFSET(ep->tx.buf, ep->tx.offset),
&sent_length, NULL, NULL);
if (ucs_unlikely((status != UCS_OK) &&
(status != UCS_ERR_NO_PROGRESS))) {
return status;
}

iface->outstanding -= *sent_length;
ep->tx.offset += *sent_length;
iface->outstanding -= sent_length;
ep->tx.offset += sent_length;

ucs_assert(sent_length <= SSIZE_MAX);

return sent_length;
}

return (*sent_length > 0);
static inline void uct_tcp_ep_comp_zcopy(uct_tcp_ep_t *ep,
uct_completion_t *comp,
ucs_status_t status)
{
ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX);
if (comp != NULL) {
uct_invoke_completion(comp, status);
}
}

static inline unsigned uct_tcp_ep_sendv(uct_tcp_ep_t *ep, size_t *sent_length)
static inline ssize_t uct_tcp_ep_sendv(uct_tcp_ep_t *ep)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_zcopy_ctx_t *ctx = (uct_tcp_ep_zcopy_ctx_t*)ep->tx.buf;
size_t sent_length;
ucs_status_t status;

ucs_assertv(ep->tx.offset < ep->tx.length, "ep=%p", ep);

status = ucs_socket_sendv_nb(ep->fd, &ctx->iov[ctx->iov_index],
ctx->iov_cnt - ctx->iov_index,
sent_length, NULL, NULL);
&sent_length, NULL, NULL);

if (ucs_unlikely(status != UCS_OK)) {
if (status == UCS_ERR_NO_PROGRESS) {
ucs_assert(sent_length == 0);
return 0;
}

uct_tcp_ep_comp_zcopy(ep, ctx->comp, status);
return status;
}

ep->tx.offset += *sent_length;
iface->outstanding -= *sent_length;
ep->tx.offset += sent_length;
iface->outstanding -= sent_length;

if ((ep->tx.offset != ep->tx.length) &&
((status == UCS_OK) || (status == UCS_ERR_NO_PROGRESS))) {
if (ep->tx.offset != ep->tx.length) {
ucs_iov_advance(ctx->iov, ctx->iov_cnt,
&ctx->iov_index, *sent_length);
&ctx->iov_index, sent_length);
} else {
ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX);
if (ctx->comp != NULL) {
uct_invoke_completion(ctx->comp, status);
}
uct_tcp_ep_comp_zcopy(ep, ctx->comp, UCS_OK);
}

return (*sent_length > 0);
ucs_assert(sent_length <= SSIZE_MAX);
return sent_length;
}

void uct_tcp_ep_dropped_connect_print_error(uct_tcp_ep_t *ep, int io_errno)
Expand Down Expand Up @@ -612,20 +635,23 @@ static inline unsigned uct_tcp_ep_recv(uct_tcp_ep_t *ep, size_t recv_length)

static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep)
{
unsigned count = 0;
size_t sent_length;
unsigned ret = 0;
ssize_t offset;

ucs_trace_func("ep=%p", ep);

if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
if (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX))) {
count += uct_tcp_ep_send(ep, &sent_length);
} else {
count += uct_tcp_ep_sendv(ep, &sent_length);
offset = (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX)) ?
uct_tcp_ep_send(ep) : uct_tcp_ep_sendv(ep));
if (ucs_unlikely(offset < 0)) {
uct_tcp_ep_handle_disconnected(ep, &ep->tx);
return 1;
}

ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved to offest %zu",
ep, ep->fd, ep->tx.offset, ep->tx.length, sent_length);
ret = (offset > 0);

ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved by offset %zd",
ep, ep->fd, ep->tx.offset, ep->tx.length, offset);

if (!uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
uct_tcp_ep_ctx_reset(&ep->tx);
Expand All @@ -634,15 +660,15 @@ static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep)

if (!ucs_queue_is_empty(&ep->pending_q)) {
uct_tcp_ep_pending_queue_dispatch(ep);
return count;
return ret;
}

if (uct_tcp_ep_ctx_buf_empty(&ep->tx)) {
ucs_assert(ucs_queue_is_empty(&ep->pending_q));
uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVWRITE);
}

return count;
return ret;
}

static inline void
Expand Down Expand Up @@ -766,7 +792,9 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
return UCS_OK;

err_no_res:
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
if (ep->fd != -1) {
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}
UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
return UCS_ERR_NO_RESOURCE;
}
Expand Down Expand Up @@ -796,26 +824,32 @@ uct_tcp_ep_set_outstanding_zcopy(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}

static inline void uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
const uct_tcp_am_hdr_t *hdr)
static inline ucs_status_t
uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
const uct_tcp_am_hdr_t *hdr)
{
size_t sent_length;
ssize_t offset;

ep->tx.length = sizeof(*hdr) + hdr->length;
iface->outstanding += ep->tx.length;

uct_tcp_ep_send(ep, &sent_length);
offset = uct_tcp_ep_send(ep);
if (ucs_unlikely(offset < 0)) {
return offset;
}

uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_SEND, hdr->am_id,
hdr + 1, hdr->length, "SEND: ep %p fd %d sent "
"%zu/%zu bytes, moved to offest %zu",
ep, ep->fd, ep->tx.offset, ep->tx.length, sent_length);
"%zu/%zu bytes, moved by offset %zd",
ep, ep->fd, ep->tx.offset, ep->tx.length, offset);

if (ucs_likely(!uct_tcp_ep_ctx_buf_need_progress(&ep->tx))) {
uct_tcp_ep_ctx_reset(&ep->tx);
} else {
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}

return UCS_OK;
}

static const void*
Expand Down Expand Up @@ -856,7 +890,7 @@ uct_tcp_ep_am_sendv(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
uct_tcp_ep_am_sendv_get_trace_payload(hdr, header,
&iov[2], short_sendv),
hdr->length, "SEND: ep %p fd %d sent %zu/%zu bytes, "
"moved to offest %zu, iov cnt %zu "
"moved by offset %zu, iov cnt %zu "
"[addr %p len %zu] [addr %p len %zu]",
ep, ep->fd, ep->tx.offset, ep->tx.length,
ep->tx.offset, iov_cnt,
Expand Down Expand Up @@ -901,7 +935,12 @@ ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header

if (length <= iface->config.sendv_thresh) {
uct_am_short_fill_data(hdr + 1, header, payload, length);
uct_tcp_ep_am_send(iface, ep, hdr);
status = uct_tcp_ep_am_send(iface, ep, hdr);
if (ucs_unlikely(status != UCS_OK)) {
uct_tcp_ep_ctx_reset(&ep->tx);
return status;
}

UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, payload_length);
} else {
iov[0].iov_base = hdr;
Expand Down Expand Up @@ -963,7 +1002,11 @@ ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id,
* can be released inside `uct_tcp_ep_am_send` call */
hdr->length = payload_length = pack_cb(hdr + 1, arg);

uct_tcp_ep_am_send(iface, ep, hdr);
status = uct_tcp_ep_am_send(iface, ep, hdr);
if (ucs_unlikely(status != UCS_OK)) {
uct_tcp_ep_ctx_reset(&ep->tx);
return status;
}

UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, payload_length);

Expand Down

0 comments on commit 7bf663f

Please sign in to comment.