Skip to content

Commit

Permalink
TCP/TEST: Fix simultaneous ep close with ucp_hello_world
Browse files Browse the repository at this point in the history
Fixes an assertion failure when running ucp_hello_world over TCP
transport.

UCP: When both sides close their TCP endpoints, one side can receive
connection reset event while it's trying to close-flush its endpoint. We
should not try to invoke user error callback in such case. Instead, the
close operation should complete with status CONNECTION_RESET.

UCT/TCP: Need to purge outstanding PUT operations when getting an error.

Test: Run ucp_hello_world over several transports. Currently it used
TCP only when ran inside a docker, so issue was not detected.
  • Loading branch information
yosefe committed Aug 11, 2021
1 parent d745991 commit 6956fc9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
17 changes: 13 additions & 4 deletions contrib/test_jenkins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,28 @@ run_ucp_hello() {

export UCX_KEEPALIVE_INTERVAL=1s
export UCX_KEEPALIVE_NUM_EPS=10
export UCX_LOG_LEVEL=info
export UCX_MM_ERROR_HANDLING=y

for test_mode in -w -f -b -erecv -esend -ekeepalive
for tls in all tcp shm
do
for mem_type in $mem_types_list
export UCX_TLS=${tls}
for test_mode in -w -f -b -erecv -esend -ekeepalive
do
echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ===="
run_hello ucp ${test_mode} -m ${mem_type}
for mem_type in $mem_types_list
do
echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ===="
run_hello ucp ${test_mode} -m ${mem_type}
done
done
done
rm -f ./ucp_hello_world

unset UCX_KEEPALIVE_INTERVAL
unset UCX_KEEPALIVE_NUM_EPS
unset UCX_LOG_LEVEL
unset UCX_TLS
unset UCX_MM_ERROR_HANDLING
}

#
Expand Down
13 changes: 10 additions & 3 deletions examples/ucp_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ static const ucp_tag_t tag = 0x1337a880u;
static const ucp_tag_t tag_mask = UINT64_MAX;
static const char *addr_msg_str = "UCX address message";
static const char *data_msg_str = "UCX data message";
static int print_config = 0;
static ucp_address_t *local_addr;
static ucp_address_t *peer_addr;

Expand Down Expand Up @@ -292,7 +293,7 @@ static int run_ucx_client(ucp_worker_h ucp_worker)
CHKERR_JUMP(status != UCS_OK, "test_poll_wait\n", err_ep);
}
}

if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) {
fprintf(stderr, "Emulating unexpected failure after receive completion "
"on client side, server should detect error by "
Expand Down Expand Up @@ -546,7 +547,9 @@ int main(int argc, char **argv)

status = ucp_init(&ucp_params, config, &ucp_context);

ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
if (print_config) {
ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
}

ucp_config_release(config);
CHKERR_JUMP(status != UCS_OK, "ucp_init\n", err);
Expand Down Expand Up @@ -641,6 +644,7 @@ static void print_usage()
"before receive completed\n");
fprintf(stderr, " keepalive - keepalive failure on client side "
"after communication completed\n");
fprintf(stderr, " -c Print UCP configuration\n");
print_common_help();
fprintf(stderr, "\n");
}
Expand All @@ -652,7 +656,7 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_NONE;
err_handling_opt.failure_mode = FAILURE_MODE_NONE;

while ((c = getopt(argc, argv, "wfbe:n:p:s:m:h")) != -1) {
while ((c = getopt(argc, argv, "wfbe:n:p:s:m:ch")) != -1) {
switch (c) {
case 'w':
ucp_test_mode = TEST_MODE_WAIT;
Expand Down Expand Up @@ -699,6 +703,9 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
return UCS_ERR_UNSUPPORTED;
}
break;
case 'c':
print_config = 1;
break;
case 'h':
default:
print_usage();
Expand Down
18 changes: 10 additions & 8 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1077,13 +1077,14 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
ucp_stream_ep_cleanup(ucp_ep);

if (ucp_ep->flags & UCP_EP_FLAG_USED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSED);
/* Promote close operation to CANCEL in case of transport error,
if (ucp_ep->flags & UCP_EP_FLAG_CLOSED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
/* Promote close operation to CANCEL in case of transport error,
* since the disconnect event may never arrive. */
close_req = ep_ext_control->close_req.req;
close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL;
ucp_ep_local_disconnect_progress(close_req);
close_req = ep_ext_control->close_req.req;
close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL;
ucp_ep_local_disconnect_progress(close_req);
}
} else if (ep_ext_control->err_cb == NULL) {
/* Do not print error if connection reset by remote peer since it
* can be part of user level close protocol */
Expand Down Expand Up @@ -1388,7 +1389,8 @@ void ucp_ep_destroy(ucp_ep_h ep)
ucp_worker_progress(worker);
status = ucp_request_check_status(request);
} while (status == UCS_INPROGRESS);

ucs_debug("ep_close request %p completed with status %s", request,
ucs_status_string(status));
ucp_request_release(request);
}

Expand Down Expand Up @@ -3132,7 +3134,7 @@ static ucs_status_t ucp_ep_query_sockaddr(ucp_ep_h ep, ucp_ep_attr_t *attr)
return status;
}
}

if (uct_cm_ep_attr.field_mask & UCT_EP_ATTR_FIELD_REMOTE_SOCKADDR) {
status = ucs_sockaddr_copy((struct sockaddr*)&attr->remote_sockaddr,
(struct sockaddr*)&uct_cm_ep_attr.remote_address);
Expand Down
23 changes: 9 additions & 14 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,19 +370,22 @@ uct_tcp_ep_zcopy_completed(uct_tcp_ep_t *ep, uct_completion_t *comp,
}
}

static void uct_tcp_ep_purge(uct_tcp_ep_t *ep)
static void uct_tcp_ep_purge(uct_tcp_ep_t *ep, ucs_status_t status)
{
uct_tcp_ep_put_completion_t *put_comp;
uct_tcp_ep_zcopy_tx_t *ctx;

ucs_debug("tcp_ep %p: purge outstanding operations with status %s", ep,
ucs_status_string(status));

if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) {
ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
uct_tcp_ep_zcopy_completed(ep, ctx->comp, UCS_ERR_CANCELED);
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset);
}

ucs_queue_for_each_extract(put_comp, &ep->put_comp_q, elem, 1) {
uct_invoke_completion(put_comp->comp, UCS_ERR_CANCELED);
uct_invoke_completion(put_comp->comp, status);
ucs_mpool_put_inline(put_comp);
}
}
Expand All @@ -403,7 +406,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_tcp_ep_t)
}

uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_CAPS);
uct_tcp_ep_purge(self);
uct_tcp_ep_purge(self, UCS_ERR_CANCELED);

if (self->flags & UCT_TCP_EP_FLAG_FAILED) {
/* a failed EP callback can be still scheduled on the UCT worker,
Expand Down Expand Up @@ -447,7 +450,7 @@ void uct_tcp_ep_destroy(uct_ep_h tl_ep)
/* remove TX capability, but still will be able to receive data */
uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_FLAG_CTX_TYPE_TX);
/* purge all outstanding operations (GET/PUT Zcopy, flush operations) */
uct_tcp_ep_purge(ep);
uct_tcp_ep_purge(ep, UCS_ERR_CANCELED);
uct_tcp_cm_insert_ep(iface, ep);
} else {
uct_tcp_ep_destroy_internal(tl_ep);
Expand Down Expand Up @@ -947,7 +950,6 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_zcopy_tx_t *ctx;

ucs_debug("tcp_ep %p: remote disconnected", ep);

Expand All @@ -957,21 +959,14 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK;
}

if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) {
/* There is ongoing AM/PUT Zcopy operation, need to notify
* the user about the error */
ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
}
uct_tcp_ep_purge(ep, status);

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
/* if the EP is waiting for the acknowledgment of the started
* PUT operation, decrease iface::outstanding counter */
uct_tcp_iface_outstanding_dec(iface);
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK;
}

uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset);
}

uct_tcp_ep_set_failed(ep);
Expand Down

0 comments on commit 6956fc9

Please sign in to comment.