Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/TCP: Close TCP socket when unable to send data #4305

Merged
merged 1 commit into from
Oct 19, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 92 additions & 49 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,69 +472,92 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep,
{
ucs_debug("tcp_ep %p: remote disconnected", ep);

uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVREAD);
uct_tcp_ep_ctx_reset(ctx);

if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_RX);
uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVREAD);
} else {
/* If the EP supports RX only, destroy it */
uct_tcp_ep_destroy_internal(&ep->super.super);
}

uct_tcp_ep_mod_events(ep, 0, ep->events);
uct_tcp_ep_close_fd(&ep->fd);
} else if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
/* If the EP supports RX only, destroy it */
uct_tcp_ep_destroy_internal(&ep->super.super);
}
}

static inline unsigned uct_tcp_ep_send(uct_tcp_ep_t *ep, size_t *sent_length)
static inline ssize_t uct_tcp_ep_send(uct_tcp_ep_t *ep)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
size_t sent_length;
ucs_status_t status;

*sent_length = ep->tx.length - ep->tx.offset;
ucs_assert(*sent_length > 0);
ucs_assert(ep->tx.length > ep->tx.offset);
sent_length = ep->tx.length - ep->tx.offset;

status = ucs_socket_send_nb(ep->fd, ep->tx.buf + ep->tx.offset,
sent_length, NULL, NULL);
if (status != UCS_OK) {
return 0;
status = ucs_socket_send_nb(ep->fd, UCS_PTR_BYTE_OFFSET(ep->tx.buf, ep->tx.offset),
&sent_length, NULL, NULL);
if (ucs_unlikely((status != UCS_OK) &&
(status != UCS_ERR_NO_PROGRESS))) {
return status;
}

iface->outstanding -= *sent_length;
ep->tx.offset += *sent_length;
iface->outstanding -= sent_length;
ep->tx.offset += sent_length;

ucs_assert(sent_length <= SSIZE_MAX);

return sent_length;
}

return (*sent_length > 0);
static inline void uct_tcp_ep_comp_zcopy(uct_tcp_ep_t *ep,
uct_completion_t *comp,
ucs_status_t status)
{
ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX);
if (comp != NULL) {
uct_invoke_completion(comp, status);
}
}

static inline unsigned uct_tcp_ep_sendv(uct_tcp_ep_t *ep, size_t *sent_length)
static inline ssize_t uct_tcp_ep_sendv(uct_tcp_ep_t *ep)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_zcopy_ctx_t *ctx = (uct_tcp_ep_zcopy_ctx_t*)ep->tx.buf;
size_t sent_length;
ucs_status_t status;

ucs_assertv(ep->tx.offset < ep->tx.length, "ep=%p", ep);

status = ucs_socket_sendv_nb(ep->fd, &ctx->iov[ctx->iov_index],
ctx->iov_cnt - ctx->iov_index,
sent_length, NULL, NULL);
&sent_length, NULL, NULL);

if (ucs_unlikely(status != UCS_OK)) {
if (status == UCS_ERR_NO_PROGRESS) {
ucs_assert(sent_length == 0);
return 0;
}

uct_tcp_ep_comp_zcopy(ep, ctx->comp, status);
return status;
}

ep->tx.offset += *sent_length;
iface->outstanding -= *sent_length;
ep->tx.offset += sent_length;
iface->outstanding -= sent_length;

if ((ep->tx.offset != ep->tx.length) &&
((status == UCS_OK) || (status == UCS_ERR_NO_PROGRESS))) {
if (ep->tx.offset != ep->tx.length) {
ucs_iov_advance(ctx->iov, ctx->iov_cnt,
&ctx->iov_index, *sent_length);
&ctx->iov_index, sent_length);
} else {
ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX);
if (ctx->comp != NULL) {
uct_invoke_completion(ctx->comp, status);
}
uct_tcp_ep_comp_zcopy(ep, ctx->comp, UCS_OK);
}

return (*sent_length > 0);
ucs_assert(sent_length <= SSIZE_MAX);
return sent_length;
}

void uct_tcp_ep_dropped_connect_print_error(uct_tcp_ep_t *ep, int io_errno)
Expand Down Expand Up @@ -612,20 +635,23 @@ static inline unsigned uct_tcp_ep_recv(uct_tcp_ep_t *ep, size_t recv_length)

static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep)
{
unsigned count = 0;
size_t sent_length;
unsigned ret = 0;
ssize_t offset;

ucs_trace_func("ep=%p", ep);

if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
if (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX))) {
count += uct_tcp_ep_send(ep, &sent_length);
} else {
count += uct_tcp_ep_sendv(ep, &sent_length);
offset = (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX)) ?
uct_tcp_ep_send(ep) : uct_tcp_ep_sendv(ep));
if (ucs_unlikely(offset < 0)) {
uct_tcp_ep_handle_disconnected(ep, &ep->tx);
return 1;
}

ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved to offest %zu",
ep, ep->fd, ep->tx.offset, ep->tx.length, sent_length);
ret = (offset > 0);

ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved by offset %zd",
ep, ep->fd, ep->tx.offset, ep->tx.length, offset);

if (!uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
uct_tcp_ep_ctx_reset(&ep->tx);
Expand All @@ -634,15 +660,15 @@ static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep)

if (!ucs_queue_is_empty(&ep->pending_q)) {
uct_tcp_ep_pending_queue_dispatch(ep);
return count;
return ret;
}

if (uct_tcp_ep_ctx_buf_empty(&ep->tx)) {
ucs_assert(ucs_queue_is_empty(&ep->pending_q));
uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVWRITE);
}

return count;
return ret;
}

static inline void
Expand Down Expand Up @@ -766,7 +792,9 @@ uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
return UCS_OK;

err_no_res:
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
if (ep->fd != -1) {
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}
UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
return UCS_ERR_NO_RESOURCE;
}
Expand Down Expand Up @@ -796,26 +824,32 @@ uct_tcp_ep_set_outstanding_zcopy(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}

static inline void uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
const uct_tcp_am_hdr_t *hdr)
static inline ucs_status_t
uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
const uct_tcp_am_hdr_t *hdr)
{
size_t sent_length;
ssize_t offset;

ep->tx.length = sizeof(*hdr) + hdr->length;
iface->outstanding += ep->tx.length;

uct_tcp_ep_send(ep, &sent_length);
offset = uct_tcp_ep_send(ep);
if (ucs_unlikely(offset < 0)) {
return offset;
}

uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_SEND, hdr->am_id,
hdr + 1, hdr->length, "SEND: ep %p fd %d sent "
"%zu/%zu bytes, moved to offest %zu",
ep, ep->fd, ep->tx.offset, ep->tx.length, sent_length);
"%zu/%zu bytes, moved by offset %zd",
ep, ep->fd, ep->tx.offset, ep->tx.length, offset);

if (ucs_likely(!uct_tcp_ep_ctx_buf_need_progress(&ep->tx))) {
uct_tcp_ep_ctx_reset(&ep->tx);
} else {
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}

return UCS_OK;
}

static const void*
Expand Down Expand Up @@ -856,7 +890,7 @@ uct_tcp_ep_am_sendv(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
uct_tcp_ep_am_sendv_get_trace_payload(hdr, header,
&iov[2], short_sendv),
hdr->length, "SEND: ep %p fd %d sent %zu/%zu bytes, "
"moved to offest %zu, iov cnt %zu "
"moved by offset %zu, iov cnt %zu "
"[addr %p len %zu] [addr %p len %zu]",
ep, ep->fd, ep->tx.offset, ep->tx.length,
ep->tx.offset, iov_cnt,
Expand Down Expand Up @@ -901,7 +935,12 @@ ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header

if (length <= iface->config.sendv_thresh) {
uct_am_short_fill_data(hdr + 1, header, payload, length);
uct_tcp_ep_am_send(iface, ep, hdr);
status = uct_tcp_ep_am_send(iface, ep, hdr);
if (ucs_unlikely(status != UCS_OK)) {
uct_tcp_ep_ctx_reset(&ep->tx);
return status;
}

UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, payload_length);
} else {
iov[0].iov_base = hdr;
Expand Down Expand Up @@ -963,7 +1002,11 @@ ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id,
* can be released inside `uct_tcp_ep_am_send` call */
hdr->length = payload_length = pack_cb(hdr + 1, arg);

uct_tcp_ep_am_send(iface, ep, hdr);
status = uct_tcp_ep_am_send(iface, ep, hdr);
if (ucs_unlikely(status != UCS_OK)) {
uct_tcp_ep_ctx_reset(&ep->tx);
return status;
}

UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, payload_length);

Expand Down