diff --git a/contrib/test_jenkins.sh b/contrib/test_jenkins.sh index 64e4ecb2466..af619425e0c 100755 --- a/contrib/test_jenkins.sh +++ b/contrib/test_jenkins.sh @@ -504,19 +504,26 @@ run_ucp_hello() { export UCX_KEEPALIVE_INTERVAL=1s export UCX_KEEPALIVE_NUM_EPS=10 + export UCX_LOG_LEVEL=info - for test_mode in -w -f -b -erecv -esend -ekeepalive + for tls in all tcp,cuda do - for mem_type in $mem_types_list + export UCX_TLS=${tls} + for test_mode in -w -f -b -erecv -esend -ekeepalive do - echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ====" - run_hello ucp ${test_mode} -m ${mem_type} + for mem_type in $mem_types_list + do + echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ====" + run_hello ucp ${test_mode} -m ${mem_type} + done done done rm -f ./ucp_hello_world unset UCX_KEEPALIVE_INTERVAL unset UCX_KEEPALIVE_NUM_EPS + unset UCX_LOG_LEVEL + unset UCX_TLS } # diff --git a/examples/hello_world_util.h b/examples/hello_world_util.h index 51cff49c830..6f3a3c16636 100644 --- a/examples/hello_world_util.h +++ b/examples/hello_world_util.h @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -265,8 +266,10 @@ int client_connect(const char *server, uint16_t server_port) return -1; } -static inline int barrier(int oob_sock) +static inline int +barrier(int oob_sock, void (*progress_cb)(void *arg), void *arg) { + struct pollfd pfd; int dummy = 0; ssize_t res; @@ -275,6 +278,14 @@ static inline int barrier(int oob_sock) return res; } + pfd.fd = oob_sock; + pfd.events = POLLIN; + pfd.revents = 0; + do { + res = poll(&pfd, 1, 1); + progress_cb(arg); + } while (res != 1); + res = recv(oob_sock, &dummy, sizeof(dummy), MSG_WAITALL); /* number of received bytes should be the same as sent */ diff --git a/examples/ucp_hello_world.c b/examples/ucp_hello_world.c index 0658cd057c9..c45e5ae2bbb 100644 --- a/examples/ucp_hello_world.c +++ b/examples/ucp_hello_world.c @@ -85,6 +85,7 @@ static const ucp_tag_t tag = 0x1337a880u; static const ucp_tag_t tag_mask = UINT64_MAX; static const char *addr_msg_str = "UCX address message"; static const char *data_msg_str = "UCX data message"; +static int print_config = 0; static ucp_address_t *local_addr; static ucp_address_t *peer_addr; @@ -292,7 +293,7 @@ static int run_ucx_client(ucp_worker_h ucp_worker) CHKERR_JUMP(status != UCS_OK, "test_poll_wait\n", err_ep); } } - + if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) { fprintf(stderr, "Emulating unexpected failure after receive completion " "on client side, server should detect error by " @@ -505,6 +506,11 @@ static int run_test(const char *client_target_name, ucp_worker_h ucp_worker) } } +static void progress_worker(void *arg) +{ + ucp_worker_progress((ucp_worker_h)arg); +} + int main(int argc, char **argv) { /* UCP temporary vars */ @@ -546,7 +552,9 @@ int main(int argc, char **argv) status = ucp_init(&ucp_params, config, &ucp_context); - ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG); + if (print_config) { + ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG); + } ucp_config_release(config); CHKERR_JUMP(status != UCS_OK, "ucp_init\n", err); @@ -597,9 +605,9 @@ int main(int argc, char **argv) ret = run_test(client_target_name, ucp_worker); - if (!ret && (err_handling_opt.failure_mode != FAILURE_MODE_NONE)) { + if (!ret && (err_handling_opt.failure_mode == FAILURE_MODE_NONE)) { /* Make sure remote is disconnected before destroying local worker */ - ret = barrier(oob_sock); + ret = barrier(oob_sock, progress_worker, ucp_worker); } close(oob_sock); @@ -641,6 +649,7 @@ static void print_usage() "before receive completed\n"); fprintf(stderr, " keepalive - keepalive failure on client side " "after communication completed\n"); + fprintf(stderr, " -c Print UCP configuration\n"); print_common_help(); fprintf(stderr, "\n"); } @@ -652,7 +661,7 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name) err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_NONE; err_handling_opt.failure_mode = FAILURE_MODE_NONE; - while ((c = getopt(argc, argv, "wfbe:n:p:s:m:h")) != -1) { + while ((c = getopt(argc, argv, "wfbe:n:p:s:m:ch")) != -1) { switch (c) { case 'w': ucp_test_mode = TEST_MODE_WAIT; @@ -699,6 +708,9 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name) return UCS_ERR_UNSUPPORTED; } break; + case 'c': + print_config = 1; + break; case 'h': default: print_usage(); diff --git a/examples/uct_hello_world.c b/examples/uct_hello_world.c index 66a51d81f39..f97545f0458 100644 --- a/examples/uct_hello_world.c +++ b/examples/uct_hello_world.c @@ -561,6 +561,11 @@ int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf) return 0; } +static void progress_worker(void *arg) +{ + uct_worker_progress((uct_worker_h)arg); +} + int main(int argc, char **argv) { uct_device_addr_t *peer_dev = NULL; @@ -600,6 +605,11 @@ int main(int argc, char **argv) CHKERR_JUMP(UCS_OK != status, "find supported device and transport", out_destroy_worker); + /* Set active message handler */ + status = uct_iface_set_am_handler(if_info.iface, id, hello_world, + &cmd_args.func_am_type, 0); + CHKERR_JUMP(UCS_OK != status, "set callback", out_destroy_iface); + own_dev = (uct_device_addr_t*)calloc(1, if_info.iface_attr.device_addr_len); CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr", out_destroy_iface); @@ -660,7 +670,7 @@ int main(int argc, char **argv) /* Connect endpoint to a remote endpoint */ status = uct_ep_connect_to_ep(ep, peer_dev, peer_ep); - if (barrier(oob_sock)) { + if (barrier(oob_sock, progress_worker, if_info.worker)) { status = UCS_ERR_IO_ERROR; goto out_free_ep; } @@ -685,11 +695,6 @@ int main(int argc, char **argv) goto out_free_ep; } - /* Set active message handler */ - status = uct_iface_set_am_handler(if_info.iface, id, hello_world, - &cmd_args.func_am_type, 0); - CHKERR_JUMP(UCS_OK != status, "set callback", out_free_ep); - if (cmd_args.server_name) { char *str = (char *)mem_type_malloc(cmd_args.test_strlen); CHKERR_ACTION(str == NULL, "allocate memory", @@ -729,7 +734,7 @@ int main(int argc, char **argv) } } - if (barrier(oob_sock)) { + if (barrier(oob_sock, progress_worker, if_info.worker)) { status = UCS_ERR_IO_ERROR; } diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 51d121bf2a2..40efba46d11 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1077,13 +1077,14 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucp_stream_ep_cleanup(ucp_ep); if (ucp_ep->flags & UCP_EP_FLAG_USED) { - if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) { - ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSED); - /* Promote close operation to CANCEL in case of transport error, + if (ucp_ep->flags & UCP_EP_FLAG_CLOSED) { + if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) { + /* Promote close operation to CANCEL in case of transport error, * since the disconnect event may never arrive. */ - close_req = ep_ext_control->close_req.req; - close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL; - ucp_ep_local_disconnect_progress(close_req); + close_req = ep_ext_control->close_req.req; + close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL; + ucp_ep_local_disconnect_progress(close_req); + } } else if (ep_ext_control->err_cb == NULL) { /* Do not print error if connection reset by remote peer since it * can be part of user level close protocol */ @@ -1388,7 +1389,8 @@ void ucp_ep_destroy(ucp_ep_h ep) ucp_worker_progress(worker); status = ucp_request_check_status(request); } while (status == UCS_INPROGRESS); - + ucs_debug("ep_close request %p completed with status %s", request, + ucs_status_string(status)); ucp_request_release(request); } @@ -3132,7 +3134,7 @@ static ucs_status_t ucp_ep_query_sockaddr(ucp_ep_h ep, ucp_ep_attr_t *attr) return status; } } - + if (uct_cm_ep_attr.field_mask & UCT_EP_ATTR_FIELD_REMOTE_SOCKADDR) { status = ucs_sockaddr_copy((struct sockaddr*)&attr->remote_sockaddr, (struct sockaddr*)&uct_cm_ep_attr.remote_address); diff --git a/src/uct/tcp/tcp_ep.c b/src/uct/tcp/tcp_ep.c index 5aee50f018b..d039de9bb08 100644 --- a/src/uct/tcp/tcp_ep.c +++ b/src/uct/tcp/tcp_ep.c @@ -370,19 +370,22 @@ uct_tcp_ep_zcopy_completed(uct_tcp_ep_t *ep, uct_completion_t *comp, } } -static void uct_tcp_ep_purge(uct_tcp_ep_t *ep) +static void uct_tcp_ep_purge(uct_tcp_ep_t *ep, ucs_status_t status) { uct_tcp_ep_put_completion_t *put_comp; uct_tcp_ep_zcopy_tx_t *ctx; + ucs_debug("tcp_ep %p: purge outstanding operations with status %s", ep, + ucs_status_string(status)); + if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) { ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf; - uct_tcp_ep_zcopy_completed(ep, ctx->comp, UCS_ERR_CANCELED); + uct_tcp_ep_zcopy_completed(ep, ctx->comp, status); uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset); } ucs_queue_for_each_extract(put_comp, &ep->put_comp_q, elem, 1) { - uct_invoke_completion(put_comp->comp, UCS_ERR_CANCELED); + uct_invoke_completion(put_comp->comp, status); ucs_mpool_put_inline(put_comp); } } @@ -403,7 +406,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_tcp_ep_t) } uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_CAPS); - uct_tcp_ep_purge(self); + uct_tcp_ep_purge(self, UCS_ERR_CANCELED); if (self->flags & UCT_TCP_EP_FLAG_FAILED) { /* a failed EP callback can be still scheduled on the UCT worker, @@ -447,7 +450,7 @@ void uct_tcp_ep_destroy(uct_ep_h tl_ep) /* remove TX capability, but still will be able to receive data */ uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_FLAG_CTX_TYPE_TX); /* purge all outstanding operations (GET/PUT Zcopy, flush operations) */ - uct_tcp_ep_purge(ep); + uct_tcp_ep_purge(ep, UCS_ERR_CANCELED); uct_tcp_cm_insert_ep(iface, ep); } else { uct_tcp_ep_destroy_internal(tl_ep); @@ -947,7 +950,6 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status { uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_tcp_iface_t); - uct_tcp_ep_zcopy_tx_t *ctx; ucs_debug("tcp_ep %p: remote disconnected", ep); @@ -957,12 +959,7 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status ep->flags &= ~UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK; } - if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) { - /* There is ongoing AM/PUT Zcopy operation, need to notify - * the user about the error */ - ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf; - uct_tcp_ep_zcopy_completed(ep, ctx->comp, status); - } + uct_tcp_ep_purge(ep, status); if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) { /* if the EP is waiting for the acknowledgment of the started