Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP/TEST: Fix simultaneous ep close with ucp_hello_world #7224

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions contrib/test_jenkins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,26 @@ run_ucp_hello() {

export UCX_KEEPALIVE_INTERVAL=1s
export UCX_KEEPALIVE_NUM_EPS=10
export UCX_LOG_LEVEL=info

for test_mode in -w -f -b -erecv -esend -ekeepalive
for tls in all tcp,cuda
do
for mem_type in $mem_types_list
export UCX_TLS=${tls}
for test_mode in -w -f -b -erecv -esend -ekeepalive
do
echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ===="
run_hello ucp ${test_mode} -m ${mem_type}
for mem_type in $mem_types_list
do
echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ===="
run_hello ucp ${test_mode} -m ${mem_type}
done
done
done
rm -f ./ucp_hello_world

unset UCX_KEEPALIVE_INTERVAL
unset UCX_KEEPALIVE_NUM_EPS
unset UCX_LOG_LEVEL
unset UCX_TLS
}

#
Expand Down
13 changes: 12 additions & 1 deletion examples/hello_world_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <ucs/memory/memory_type.h>

#include <sys/poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -265,8 +266,10 @@ int client_connect(const char *server, uint16_t server_port)
return -1;
}

static inline int barrier(int oob_sock)
static inline int
barrier(int oob_sock, void (*progress_cb)(void *arg), void *arg)
{
struct pollfd pfd;
int dummy = 0;
ssize_t res;

Expand All @@ -275,6 +278,14 @@ static inline int barrier(int oob_sock)
return res;
}

pfd.fd = oob_sock;
pfd.events = POLLIN;
pfd.revents = 0;
do {
res = poll(&pfd, 1, 1);
progress_cb(arg);
} while (res != 1);

res = recv(oob_sock, &dummy, sizeof(dummy), MSG_WAITALL);

/* number of received bytes should be the same as sent */
Expand Down
22 changes: 17 additions & 5 deletions examples/ucp_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ static const ucp_tag_t tag = 0x1337a880u;
static const ucp_tag_t tag_mask = UINT64_MAX;
static const char *addr_msg_str = "UCX address message";
static const char *data_msg_str = "UCX data message";
static int print_config = 0;
static ucp_address_t *local_addr;
static ucp_address_t *peer_addr;

Expand Down Expand Up @@ -292,7 +293,7 @@ static int run_ucx_client(ucp_worker_h ucp_worker)
CHKERR_JUMP(status != UCS_OK, "test_poll_wait\n", err_ep);
}
}

if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) {
fprintf(stderr, "Emulating unexpected failure after receive completion "
"on client side, server should detect error by "
Expand Down Expand Up @@ -505,6 +506,11 @@ static int run_test(const char *client_target_name, ucp_worker_h ucp_worker)
}
}

static void progress_worker(void *arg)
{
ucp_worker_progress((ucp_worker_h)arg);
}

int main(int argc, char **argv)
{
/* UCP temporary vars */
Expand Down Expand Up @@ -546,7 +552,9 @@ int main(int argc, char **argv)

status = ucp_init(&ucp_params, config, &ucp_context);

ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
if (print_config) {
ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
}

ucp_config_release(config);
CHKERR_JUMP(status != UCS_OK, "ucp_init\n", err);
Expand Down Expand Up @@ -597,9 +605,9 @@ int main(int argc, char **argv)

ret = run_test(client_target_name, ucp_worker);

if (!ret && (err_handling_opt.failure_mode != FAILURE_MODE_NONE)) {
if (!ret && (err_handling_opt.failure_mode == FAILURE_MODE_NONE)) {
/* Make sure remote is disconnected before destroying local worker */
ret = barrier(oob_sock);
ret = barrier(oob_sock, progress_worker, ucp_worker);
}
close(oob_sock);

Expand Down Expand Up @@ -641,6 +649,7 @@ static void print_usage()
"before receive completed\n");
fprintf(stderr, " keepalive - keepalive failure on client side "
"after communication completed\n");
fprintf(stderr, " -c Print UCP configuration\n");
evgeny-leksikov marked this conversation as resolved.
Show resolved Hide resolved
print_common_help();
fprintf(stderr, "\n");
}
Expand All @@ -652,7 +661,7 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_NONE;
err_handling_opt.failure_mode = FAILURE_MODE_NONE;

while ((c = getopt(argc, argv, "wfbe:n:p:s:m:h")) != -1) {
while ((c = getopt(argc, argv, "wfbe:n:p:s:m:ch")) != -1) {
switch (c) {
case 'w':
ucp_test_mode = TEST_MODE_WAIT;
Expand Down Expand Up @@ -699,6 +708,9 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
return UCS_ERR_UNSUPPORTED;
}
break;
case 'c':
print_config = 1;
break;
case 'h':
default:
print_usage();
Expand Down
19 changes: 12 additions & 7 deletions examples/uct_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,11 @@ int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf)
return 0;
}

static void progress_worker(void *arg)
{
uct_worker_progress((uct_worker_h)arg);
}

int main(int argc, char **argv)
{
uct_device_addr_t *peer_dev = NULL;
Expand Down Expand Up @@ -600,6 +605,11 @@ int main(int argc, char **argv)
CHKERR_JUMP(UCS_OK != status, "find supported device and transport",
out_destroy_worker);

/* Set active message handler */
status = uct_iface_set_am_handler(if_info.iface, id, hello_world,
&cmd_args.func_am_type, 0);
CHKERR_JUMP(UCS_OK != status, "set callback", out_destroy_iface);

own_dev = (uct_device_addr_t*)calloc(1, if_info.iface_attr.device_addr_len);
CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr",
out_destroy_iface);
Expand Down Expand Up @@ -660,7 +670,7 @@ int main(int argc, char **argv)

/* Connect endpoint to a remote endpoint */
status = uct_ep_connect_to_ep(ep, peer_dev, peer_ep);
if (barrier(oob_sock)) {
if (barrier(oob_sock, progress_worker, if_info.worker)) {
status = UCS_ERR_IO_ERROR;
goto out_free_ep;
}
Expand All @@ -685,11 +695,6 @@ int main(int argc, char **argv)
goto out_free_ep;
}

/* Set active message handler */
status = uct_iface_set_am_handler(if_info.iface, id, hello_world,
&cmd_args.func_am_type, 0);
CHKERR_JUMP(UCS_OK != status, "set callback", out_free_ep);

if (cmd_args.server_name) {
char *str = (char *)mem_type_malloc(cmd_args.test_strlen);
CHKERR_ACTION(str == NULL, "allocate memory",
Expand Down Expand Up @@ -729,7 +734,7 @@ int main(int argc, char **argv)
}
}

if (barrier(oob_sock)) {
if (barrier(oob_sock, progress_worker, if_info.worker)) {
status = UCS_ERR_IO_ERROR;
}

Expand Down
18 changes: 10 additions & 8 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1077,13 +1077,14 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
ucp_stream_ep_cleanup(ucp_ep);

if (ucp_ep->flags & UCP_EP_FLAG_USED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSED);
/* Promote close operation to CANCEL in case of transport error,
if (ucp_ep->flags & UCP_EP_FLAG_CLOSED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
/* Promote close operation to CANCEL in case of transport error,
* since the disconnect event may never arrive. */
close_req = ep_ext_control->close_req.req;
close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL;
ucp_ep_local_disconnect_progress(close_req);
close_req = ep_ext_control->close_req.req;
close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL;
ucp_ep_local_disconnect_progress(close_req);
}
} else if (ep_ext_control->err_cb == NULL) {
/* Do not print error if connection reset by remote peer since it
* can be part of user level close protocol */
Expand Down Expand Up @@ -1388,7 +1389,8 @@ void ucp_ep_destroy(ucp_ep_h ep)
ucp_worker_progress(worker);
status = ucp_request_check_status(request);
} while (status == UCS_INPROGRESS);

ucs_debug("ep_close request %p completed with status %s", request,
ucs_status_string(status));
ucp_request_release(request);
}

Expand Down Expand Up @@ -3132,7 +3134,7 @@ static ucs_status_t ucp_ep_query_sockaddr(ucp_ep_h ep, ucp_ep_attr_t *attr)
return status;
}
}

if (uct_cm_ep_attr.field_mask & UCT_EP_ATTR_FIELD_REMOTE_SOCKADDR) {
status = ucs_sockaddr_copy((struct sockaddr*)&attr->remote_sockaddr,
(struct sockaddr*)&uct_cm_ep_attr.remote_address);
Expand Down
21 changes: 9 additions & 12 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,19 +370,22 @@ uct_tcp_ep_zcopy_completed(uct_tcp_ep_t *ep, uct_completion_t *comp,
}
}

static void uct_tcp_ep_purge(uct_tcp_ep_t *ep)
static void uct_tcp_ep_purge(uct_tcp_ep_t *ep, ucs_status_t status)
{
uct_tcp_ep_put_completion_t *put_comp;
uct_tcp_ep_zcopy_tx_t *ctx;

ucs_debug("tcp_ep %p: purge outstanding operations with status %s", ep,
ucs_status_string(status));

if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) {
ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
uct_tcp_ep_zcopy_completed(ep, ctx->comp, UCS_ERR_CANCELED);
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset);
}

ucs_queue_for_each_extract(put_comp, &ep->put_comp_q, elem, 1) {
uct_invoke_completion(put_comp->comp, UCS_ERR_CANCELED);
uct_invoke_completion(put_comp->comp, status);
ucs_mpool_put_inline(put_comp);
}
}
Expand All @@ -403,7 +406,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_tcp_ep_t)
}

uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_CAPS);
uct_tcp_ep_purge(self);
uct_tcp_ep_purge(self, UCS_ERR_CANCELED);

if (self->flags & UCT_TCP_EP_FLAG_FAILED) {
/* a failed EP callback can be still scheduled on the UCT worker,
Expand Down Expand Up @@ -447,7 +450,7 @@ void uct_tcp_ep_destroy(uct_ep_h tl_ep)
/* remove TX capability, but still will be able to receive data */
uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_FLAG_CTX_TYPE_TX);
/* purge all outstanding operations (GET/PUT Zcopy, flush operations) */
uct_tcp_ep_purge(ep);
uct_tcp_ep_purge(ep, UCS_ERR_CANCELED);
uct_tcp_cm_insert_ep(iface, ep);
} else {
uct_tcp_ep_destroy_internal(tl_ep);
Expand Down Expand Up @@ -947,7 +950,6 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_zcopy_tx_t *ctx;

ucs_debug("tcp_ep %p: remote disconnected", ep);

Expand All @@ -957,12 +959,7 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK;
}

if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) {
/* There is ongoing AM/PUT Zcopy operation, need to notify
* the user about the error */
ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
}
uct_tcp_ep_purge(ep, status);

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
/* if the EP is waiting for the acknowledgment of the started
Expand Down