Skip to content

Commit

Permalink
Merge pull request #7224 from yosefe/topic/tcp-test-fix-simultaneous-…
Browse files Browse the repository at this point in the history
…ep-close-with

TCP/TEST: Fix simultaneous ep close with ucp_hello_world
  • Loading branch information
yosefe authored Aug 17, 2021
2 parents de9fbfc + f7a4757 commit 70c347e
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 37 deletions.
15 changes: 11 additions & 4 deletions contrib/test_jenkins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -508,19 +508,26 @@ run_ucp_hello() {

export UCX_KEEPALIVE_INTERVAL=1s
export UCX_KEEPALIVE_NUM_EPS=10
export UCX_LOG_LEVEL=info

for test_mode in -w -f -b -erecv -esend -ekeepalive
for tls in all tcp,cuda
do
for mem_type in $mem_types_list
export UCX_TLS=${tls}
for test_mode in -w -f -b -erecv -esend -ekeepalive
do
echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ===="
run_hello ucp ${test_mode} -m ${mem_type}
for mem_type in $mem_types_list
do
echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ===="
run_hello ucp ${test_mode} -m ${mem_type}
done
done
done
rm -f ./ucp_hello_world

unset UCX_KEEPALIVE_INTERVAL
unset UCX_KEEPALIVE_NUM_EPS
unset UCX_LOG_LEVEL
unset UCX_TLS
}

#
Expand Down
13 changes: 12 additions & 1 deletion examples/hello_world_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <ucs/memory/memory_type.h>

#include <sys/poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -265,8 +266,10 @@ int client_connect(const char *server, uint16_t server_port)
return -1;
}

static inline int barrier(int oob_sock)
static inline int
barrier(int oob_sock, void (*progress_cb)(void *arg), void *arg)
{
struct pollfd pfd;
int dummy = 0;
ssize_t res;

Expand All @@ -275,6 +278,14 @@ static inline int barrier(int oob_sock)
return res;
}

pfd.fd = oob_sock;
pfd.events = POLLIN;
pfd.revents = 0;
do {
res = poll(&pfd, 1, 1);
progress_cb(arg);
} while (res != 1);

res = recv(oob_sock, &dummy, sizeof(dummy), MSG_WAITALL);

/* number of received bytes should be the same as sent */
Expand Down
22 changes: 17 additions & 5 deletions examples/ucp_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ static const ucp_tag_t tag = 0x1337a880u;
static const ucp_tag_t tag_mask = UINT64_MAX;
static const char *addr_msg_str = "UCX address message";
static const char *data_msg_str = "UCX data message";
static int print_config = 0;
static ucp_address_t *local_addr;
static ucp_address_t *peer_addr;

Expand Down Expand Up @@ -292,7 +293,7 @@ static int run_ucx_client(ucp_worker_h ucp_worker)
CHKERR_JUMP(status != UCS_OK, "test_poll_wait\n", err_ep);
}
}

if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) {
fprintf(stderr, "Emulating unexpected failure after receive completion "
"on client side, server should detect error by "
Expand Down Expand Up @@ -505,6 +506,11 @@ static int run_test(const char *client_target_name, ucp_worker_h ucp_worker)
}
}

static void progress_worker(void *arg)
{
ucp_worker_progress((ucp_worker_h)arg);
}

int main(int argc, char **argv)
{
/* UCP temporary vars */
Expand Down Expand Up @@ -546,7 +552,9 @@ int main(int argc, char **argv)

status = ucp_init(&ucp_params, config, &ucp_context);

ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
if (print_config) {
ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
}

ucp_config_release(config);
CHKERR_JUMP(status != UCS_OK, "ucp_init\n", err);
Expand Down Expand Up @@ -597,9 +605,9 @@ int main(int argc, char **argv)

ret = run_test(client_target_name, ucp_worker);

if (!ret && (err_handling_opt.failure_mode != FAILURE_MODE_NONE)) {
if (!ret && (err_handling_opt.failure_mode == FAILURE_MODE_NONE)) {
/* Make sure remote is disconnected before destroying local worker */
ret = barrier(oob_sock);
ret = barrier(oob_sock, progress_worker, ucp_worker);
}
close(oob_sock);

Expand Down Expand Up @@ -641,6 +649,7 @@ static void print_usage()
"before receive completed\n");
fprintf(stderr, " keepalive - keepalive failure on client side "
"after communication completed\n");
fprintf(stderr, " -c Print UCP configuration\n");
print_common_help();
fprintf(stderr, "\n");
}
Expand All @@ -652,7 +661,7 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_NONE;
err_handling_opt.failure_mode = FAILURE_MODE_NONE;

while ((c = getopt(argc, argv, "wfbe:n:p:s:m:h")) != -1) {
while ((c = getopt(argc, argv, "wfbe:n:p:s:m:ch")) != -1) {
switch (c) {
case 'w':
ucp_test_mode = TEST_MODE_WAIT;
Expand Down Expand Up @@ -699,6 +708,9 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
return UCS_ERR_UNSUPPORTED;
}
break;
case 'c':
print_config = 1;
break;
case 'h':
default:
print_usage();
Expand Down
19 changes: 12 additions & 7 deletions examples/uct_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,11 @@ int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf)
return 0;
}

static void progress_worker(void *arg)
{
uct_worker_progress((uct_worker_h)arg);
}

int main(int argc, char **argv)
{
uct_device_addr_t *peer_dev = NULL;
Expand Down Expand Up @@ -600,6 +605,11 @@ int main(int argc, char **argv)
CHKERR_JUMP(UCS_OK != status, "find supported device and transport",
out_destroy_worker);

/* Set active message handler */
status = uct_iface_set_am_handler(if_info.iface, id, hello_world,
&cmd_args.func_am_type, 0);
CHKERR_JUMP(UCS_OK != status, "set callback", out_destroy_iface);

own_dev = (uct_device_addr_t*)calloc(1, if_info.iface_attr.device_addr_len);
CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr",
out_destroy_iface);
Expand Down Expand Up @@ -660,7 +670,7 @@ int main(int argc, char **argv)

/* Connect endpoint to a remote endpoint */
status = uct_ep_connect_to_ep(ep, peer_dev, peer_ep);
if (barrier(oob_sock)) {
if (barrier(oob_sock, progress_worker, if_info.worker)) {
status = UCS_ERR_IO_ERROR;
goto out_free_ep;
}
Expand All @@ -685,11 +695,6 @@ int main(int argc, char **argv)
goto out_free_ep;
}

/* Set active message handler */
status = uct_iface_set_am_handler(if_info.iface, id, hello_world,
&cmd_args.func_am_type, 0);
CHKERR_JUMP(UCS_OK != status, "set callback", out_free_ep);

if (cmd_args.server_name) {
char *str = (char *)mem_type_malloc(cmd_args.test_strlen);
CHKERR_ACTION(str == NULL, "allocate memory",
Expand Down Expand Up @@ -729,7 +734,7 @@ int main(int argc, char **argv)
}
}

if (barrier(oob_sock)) {
if (barrier(oob_sock, progress_worker, if_info.worker)) {
status = UCS_ERR_IO_ERROR;
}

Expand Down
18 changes: 10 additions & 8 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1077,13 +1077,14 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
ucp_stream_ep_cleanup(ucp_ep);

if (ucp_ep->flags & UCP_EP_FLAG_USED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSED);
/* Promote close operation to CANCEL in case of transport error,
if (ucp_ep->flags & UCP_EP_FLAG_CLOSED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
/* Promote close operation to CANCEL in case of transport error,
* since the disconnect event may never arrive. */
close_req = ep_ext_control->close_req.req;
close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL;
ucp_ep_local_disconnect_progress(close_req);
close_req = ep_ext_control->close_req.req;
close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL;
ucp_ep_local_disconnect_progress(close_req);
}
} else if (ep_ext_control->err_cb == NULL) {
/* Do not print error if connection reset by remote peer since it
* can be part of user level close protocol */
Expand Down Expand Up @@ -1388,7 +1389,8 @@ void ucp_ep_destroy(ucp_ep_h ep)
ucp_worker_progress(worker);
status = ucp_request_check_status(request);
} while (status == UCS_INPROGRESS);

ucs_debug("ep_close request %p completed with status %s", request,
ucs_status_string(status));
ucp_request_release(request);
}

Expand Down Expand Up @@ -3132,7 +3134,7 @@ static ucs_status_t ucp_ep_query_sockaddr(ucp_ep_h ep, ucp_ep_attr_t *attr)
return status;
}
}

if (uct_cm_ep_attr.field_mask & UCT_EP_ATTR_FIELD_REMOTE_SOCKADDR) {
status = ucs_sockaddr_copy((struct sockaddr*)&attr->remote_sockaddr,
(struct sockaddr*)&uct_cm_ep_attr.remote_address);
Expand Down
21 changes: 9 additions & 12 deletions src/uct/tcp/tcp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,19 +370,22 @@ uct_tcp_ep_zcopy_completed(uct_tcp_ep_t *ep, uct_completion_t *comp,
}
}

static void uct_tcp_ep_purge(uct_tcp_ep_t *ep)
static void uct_tcp_ep_purge(uct_tcp_ep_t *ep, ucs_status_t status)
{
uct_tcp_ep_put_completion_t *put_comp;
uct_tcp_ep_zcopy_tx_t *ctx;

ucs_debug("tcp_ep %p: purge outstanding operations with status %s", ep,
ucs_status_string(status));

if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) {
ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
uct_tcp_ep_zcopy_completed(ep, ctx->comp, UCS_ERR_CANCELED);
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset);
}

ucs_queue_for_each_extract(put_comp, &ep->put_comp_q, elem, 1) {
uct_invoke_completion(put_comp->comp, UCS_ERR_CANCELED);
uct_invoke_completion(put_comp->comp, status);
ucs_mpool_put_inline(put_comp);
}
}
Expand All @@ -403,7 +406,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_tcp_ep_t)
}

uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_CAPS);
uct_tcp_ep_purge(self);
uct_tcp_ep_purge(self, UCS_ERR_CANCELED);

if (self->flags & UCT_TCP_EP_FLAG_FAILED) {
/* a failed EP callback can be still scheduled on the UCT worker,
Expand Down Expand Up @@ -447,7 +450,7 @@ void uct_tcp_ep_destroy(uct_ep_h tl_ep)
/* remove TX capability, but still will be able to receive data */
uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_FLAG_CTX_TYPE_TX);
/* purge all outstanding operations (GET/PUT Zcopy, flush operations) */
uct_tcp_ep_purge(ep);
uct_tcp_ep_purge(ep, UCS_ERR_CANCELED);
uct_tcp_cm_insert_ep(iface, ep);
} else {
uct_tcp_ep_destroy_internal(tl_ep);
Expand Down Expand Up @@ -947,7 +950,6 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
uct_tcp_ep_zcopy_tx_t *ctx;

ucs_debug("tcp_ep %p: remote disconnected", ep);

Expand All @@ -957,12 +959,7 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status
ep->flags &= ~UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK;
}

if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) {
/* There is ongoing AM/PUT Zcopy operation, need to notify
* the user about the error */
ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
uct_tcp_ep_zcopy_completed(ep, ctx->comp, status);
}
uct_tcp_ep_purge(ep, status);

if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) {
/* if the EP is waiting for the acknowledgment of the started
Expand Down

0 comments on commit 70c347e

Please sign in to comment.