From 3f334fbaba558fcc93b1ae4c0a850efc8c273036 Mon Sep 17 00:00:00 2001 From: Dmitry Gladkov Date: Wed, 2 Oct 2019 20:34:49 +0300 Subject: [PATCH] UCT/TCP: Close TCP socket when unable to send data Conflicts: src/uct/tcp/tcp_ep.c --- src/uct/tcp/tcp_ep.c | 141 ++++++++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 49 deletions(-) diff --git a/src/uct/tcp/tcp_ep.c b/src/uct/tcp/tcp_ep.c index e9433d195c5..6d04c25a85e 100644 --- a/src/uct/tcp/tcp_ep.c +++ b/src/uct/tcp/tcp_ep.c @@ -472,69 +472,92 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, { ucs_debug("tcp_ep %p: remote disconnected", ep); - uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVREAD); uct_tcp_ep_ctx_reset(ctx); - if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) { - if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) { + if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) { + if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) { uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_RX); - uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVREAD); - } else { - /* If the EP supports RX only, destroy it */ - uct_tcp_ep_destroy_internal(&ep->super.super); } + + uct_tcp_ep_mod_events(ep, 0, ep->events); + uct_tcp_ep_close_fd(&ep->fd); + } else if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) { + /* If the EP supports RX only, destroy it */ + uct_tcp_ep_destroy_internal(&ep->super.super); } } -static inline unsigned uct_tcp_ep_send(uct_tcp_ep_t *ep, size_t *sent_length) +static inline ssize_t uct_tcp_ep_send(uct_tcp_ep_t *ep) { uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_tcp_iface_t); + size_t sent_length; ucs_status_t status; - *sent_length = ep->tx.length - ep->tx.offset; - ucs_assert(*sent_length > 0); + ucs_assert(ep->tx.length > ep->tx.offset); + sent_length = ep->tx.length - ep->tx.offset; - status = ucs_socket_send_nb(ep->fd, ep->tx.buf + ep->tx.offset, - sent_length, NULL, NULL); - if (status != UCS_OK) { - return 0; + status = ucs_socket_send_nb(ep->fd, UCS_PTR_BYTE_OFFSET(ep->tx.buf, ep->tx.offset), + &sent_length, NULL, NULL); + if (ucs_unlikely((status != UCS_OK) && + (status != UCS_ERR_NO_PROGRESS))) { + return status; } - iface->outstanding -= *sent_length; - ep->tx.offset += *sent_length; + iface->outstanding -= sent_length; + ep->tx.offset += sent_length; + + ucs_assert(sent_length <= SSIZE_MAX); + + return sent_length; +} - return (*sent_length > 0); +static inline void uct_tcp_ep_comp_zcopy(uct_tcp_ep_t *ep, + uct_completion_t *comp, + ucs_status_t status) +{ + ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX); + if (comp != NULL) { + uct_invoke_completion(comp, status); + } } -static inline unsigned uct_tcp_ep_sendv(uct_tcp_ep_t *ep, size_t *sent_length) +static inline ssize_t uct_tcp_ep_sendv(uct_tcp_ep_t *ep) { uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_tcp_iface_t); uct_tcp_ep_zcopy_ctx_t *ctx = (uct_tcp_ep_zcopy_ctx_t*)ep->tx.buf; + size_t sent_length; ucs_status_t status; ucs_assertv(ep->tx.offset < ep->tx.length, "ep=%p", ep); status = ucs_socket_sendv_nb(ep->fd, &ctx->iov[ctx->iov_index], ctx->iov_cnt - ctx->iov_index, - sent_length, NULL, NULL); + &sent_length, NULL, NULL); + + if (ucs_unlikely(status != UCS_OK)) { + if (status == UCS_ERR_NO_PROGRESS) { + ucs_assert(sent_length == 0); + return 0; + } + + uct_tcp_ep_comp_zcopy(ep, ctx->comp, status); + return status; + } - ep->tx.offset += *sent_length; - iface->outstanding -= *sent_length; + ep->tx.offset += sent_length; + iface->outstanding -= sent_length; - if ((ep->tx.offset != ep->tx.length) && - ((status == UCS_OK) || (status == UCS_ERR_NO_PROGRESS))) { + if (ep->tx.offset != ep->tx.length) { ucs_iov_advance(ctx->iov, ctx->iov_cnt, - &ctx->iov_index, *sent_length); + &ctx->iov_index, sent_length); } else { - ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX); - if (ctx->comp != NULL) { - uct_invoke_completion(ctx->comp, status); - } + uct_tcp_ep_comp_zcopy(ep, ctx->comp, UCS_OK); } - return (*sent_length > 0); + ucs_assert(sent_length <= SSIZE_MAX); + return sent_length; } void uct_tcp_ep_dropped_connect_print_error(uct_tcp_ep_t *ep, int io_errno) @@ -612,20 +635,23 @@ static inline unsigned uct_tcp_ep_recv(uct_tcp_ep_t *ep, size_t recv_length) static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep) { - unsigned count = 0; - size_t sent_length; + unsigned ret = 0; + ssize_t offset; ucs_trace_func("ep=%p", ep); if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) { - if (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX))) { - count += uct_tcp_ep_send(ep, &sent_length); - } else { - count += uct_tcp_ep_sendv(ep, &sent_length); + offset = (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX)) ? + uct_tcp_ep_send(ep) : uct_tcp_ep_sendv(ep)); + if (ucs_unlikely(offset < 0)) { + uct_tcp_ep_handle_disconnected(ep, &ep->tx); + return 1; } - ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved to offest %zu", - ep, ep->fd, ep->tx.offset, ep->tx.length, sent_length); + ret = (offset > 0); + + ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved by offset %zd", + ep, ep->fd, ep->tx.offset, ep->tx.length, offset); if (!uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) { uct_tcp_ep_ctx_reset(&ep->tx); @@ -634,7 +660,7 @@ static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep) if (!ucs_queue_is_empty(&ep->pending_q)) { uct_tcp_ep_pending_queue_dispatch(ep); - return count; + return ret; } if (uct_tcp_ep_ctx_buf_empty(&ep->tx)) { @@ -642,7 +668,7 @@ static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep) uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVWRITE); } - return count; + return ret; } static inline void @@ -766,7 +792,9 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, return UCS_OK; err_no_res: - uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0); + if (ep->fd != -1) { + uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0); + } UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1); return UCS_ERR_NO_RESOURCE; } @@ -796,26 +824,32 @@ uct_tcp_ep_set_outstanding_zcopy(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0); } -static inline void uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, - const uct_tcp_am_hdr_t *hdr) +static inline ucs_status_t +uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, + const uct_tcp_am_hdr_t *hdr) { - size_t sent_length; + ssize_t offset; ep->tx.length = sizeof(*hdr) + hdr->length; iface->outstanding += ep->tx.length; - uct_tcp_ep_send(ep, &sent_length); + offset = uct_tcp_ep_send(ep); + if (ucs_unlikely(offset < 0)) { + return offset; + } uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_SEND, hdr->am_id, hdr + 1, hdr->length, "SEND: ep %p fd %d sent " - "%zu/%zu bytes, moved to offest %zu", - ep, ep->fd, ep->tx.offset, ep->tx.length, sent_length); + "%zu/%zu bytes, moved by offset %zd", + ep, ep->fd, ep->tx.offset, ep->tx.length, offset); if (ucs_likely(!uct_tcp_ep_ctx_buf_need_progress(&ep->tx))) { uct_tcp_ep_ctx_reset(&ep->tx); } else { uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0); } + + return UCS_OK; } static const void* @@ -856,7 +890,7 @@ uct_tcp_ep_am_sendv(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, uct_tcp_ep_am_sendv_get_trace_payload(hdr, header, &iov[2], short_sendv), hdr->length, "SEND: ep %p fd %d sent %zu/%zu bytes, " - "moved to offest %zu, iov cnt %zu " + "moved by offset %zu, iov cnt %zu " "[addr %p len %zu] [addr %p len %zu]", ep, ep->fd, ep->tx.offset, ep->tx.length, ep->tx.offset, iov_cnt, @@ -901,7 +935,12 @@ ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header if (length <= iface->config.sendv_thresh) { uct_am_short_fill_data(hdr + 1, header, payload, length); - uct_tcp_ep_am_send(iface, ep, hdr); + status = uct_tcp_ep_am_send(iface, ep, hdr); + if (ucs_unlikely(status != UCS_OK)) { + uct_tcp_ep_ctx_reset(&ep->tx); + return status; + } + UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, payload_length); } else { iov[0].iov_base = hdr; @@ -963,7 +1002,11 @@ ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id, * can be released inside `uct_tcp_ep_am_send` call */ hdr->length = payload_length = pack_cb(hdr + 1, arg); - uct_tcp_ep_am_send(iface, ep, hdr); + status = uct_tcp_ep_am_send(iface, ep, hdr); + if (ucs_unlikely(status != UCS_OK)) { + uct_tcp_ep_ctx_reset(&ep->tx); + return status; + } UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, payload_length);