diff --git a/contrib/test_jenkins.sh b/contrib/test_jenkins.sh index 64e4ecb24661..ded2bece5227 100755 --- a/contrib/test_jenkins.sh +++ b/contrib/test_jenkins.sh @@ -504,19 +504,28 @@ run_ucp_hello() { export UCX_KEEPALIVE_INTERVAL=1s export UCX_KEEPALIVE_NUM_EPS=10 + export UCX_LOG_LEVEL=info + export UCX_MM_ERROR_HANDLING=y - for test_mode in -w -f -b -erecv -esend -ekeepalive + for tls in all tcp shm do - for mem_type in $mem_types_list + export UCX_TLS=${tls} + for test_mode in -w -f -b -erecv -esend -ekeepalive do - echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ====" - run_hello ucp ${test_mode} -m ${mem_type} + for mem_type in $mem_types_list + do + echo "==== Running UCP hello world with mode ${test_mode} and \"${mem_type}\" memory type ====" + run_hello ucp ${test_mode} -m ${mem_type} + done done done rm -f ./ucp_hello_world unset UCX_KEEPALIVE_INTERVAL unset UCX_KEEPALIVE_NUM_EPS + unset UCX_LOG_LEVEL + unset UCX_TLS + unset UCX_MM_ERROR_HANDLING } # diff --git a/examples/ucp_hello_world.c b/examples/ucp_hello_world.c index 0658cd057c98..96029c931a26 100644 --- a/examples/ucp_hello_world.c +++ b/examples/ucp_hello_world.c @@ -85,6 +85,7 @@ static const ucp_tag_t tag = 0x1337a880u; static const ucp_tag_t tag_mask = UINT64_MAX; static const char *addr_msg_str = "UCX address message"; static const char *data_msg_str = "UCX data message"; +static int print_config = 0; static ucp_address_t *local_addr; static ucp_address_t *peer_addr; @@ -292,7 +293,7 @@ static int run_ucx_client(ucp_worker_h ucp_worker) CHKERR_JUMP(status != UCS_OK, "test_poll_wait\n", err_ep); } } - + if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) { fprintf(stderr, "Emulating unexpected failure after receive completion " "on client side, server should detect error by " @@ -546,7 +547,9 @@ int main(int argc, char **argv) status = ucp_init(&ucp_params, config, &ucp_context); - ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG); + if (print_config) { + ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG); + } ucp_config_release(config); CHKERR_JUMP(status != UCS_OK, "ucp_init\n", err); @@ -641,6 +644,7 @@ static void print_usage() "before receive completed\n"); fprintf(stderr, " keepalive - keepalive failure on client side " "after communication completed\n"); + fprintf(stderr, " -c Print UCP configuration\n"); print_common_help(); fprintf(stderr, "\n"); } @@ -652,7 +656,7 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name) err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_NONE; err_handling_opt.failure_mode = FAILURE_MODE_NONE; - while ((c = getopt(argc, argv, "wfbe:n:p:s:m:h")) != -1) { + while ((c = getopt(argc, argv, "wfbe:n:p:s:m:ch")) != -1) { switch (c) { case 'w': ucp_test_mode = TEST_MODE_WAIT; @@ -699,6 +703,9 @@ ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name) return UCS_ERR_UNSUPPORTED; } break; + case 'c': + print_config = 1; + break; case 'h': default: print_usage(); diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 51d121bf2a2a..40efba46d113 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1077,13 +1077,14 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucp_stream_ep_cleanup(ucp_ep); if (ucp_ep->flags & UCP_EP_FLAG_USED) { - if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) { - ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSED); - /* Promote close operation to CANCEL in case of transport error, + if (ucp_ep->flags & UCP_EP_FLAG_CLOSED) { + if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) { + /* Promote close operation to CANCEL in case of transport error, * since the disconnect event may never arrive. */ - close_req = ep_ext_control->close_req.req; - close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL; - ucp_ep_local_disconnect_progress(close_req); + close_req = ep_ext_control->close_req.req; + close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL; + ucp_ep_local_disconnect_progress(close_req); + } } else if (ep_ext_control->err_cb == NULL) { /* Do not print error if connection reset by remote peer since it * can be part of user level close protocol */ @@ -1388,7 +1389,8 @@ void ucp_ep_destroy(ucp_ep_h ep) ucp_worker_progress(worker); status = ucp_request_check_status(request); } while (status == UCS_INPROGRESS); - + ucs_debug("ep_close request %p completed with status %s", request, + ucs_status_string(status)); ucp_request_release(request); } @@ -3132,7 +3134,7 @@ static ucs_status_t ucp_ep_query_sockaddr(ucp_ep_h ep, ucp_ep_attr_t *attr) return status; } } - + if (uct_cm_ep_attr.field_mask & UCT_EP_ATTR_FIELD_REMOTE_SOCKADDR) { status = ucs_sockaddr_copy((struct sockaddr*)&attr->remote_sockaddr, (struct sockaddr*)&uct_cm_ep_attr.remote_address); diff --git a/src/uct/tcp/tcp_ep.c b/src/uct/tcp/tcp_ep.c index 5aee50f018bf..1bf5d7c8c2ae 100644 --- a/src/uct/tcp/tcp_ep.c +++ b/src/uct/tcp/tcp_ep.c @@ -370,19 +370,22 @@ uct_tcp_ep_zcopy_completed(uct_tcp_ep_t *ep, uct_completion_t *comp, } } -static void uct_tcp_ep_purge(uct_tcp_ep_t *ep) +static void uct_tcp_ep_purge(uct_tcp_ep_t *ep, ucs_status_t status) { uct_tcp_ep_put_completion_t *put_comp; uct_tcp_ep_zcopy_tx_t *ctx; + ucs_debug("tcp_ep %p: purge outstanding operations with status %s", ep, + ucs_status_string(status)); + if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) { ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf; - uct_tcp_ep_zcopy_completed(ep, ctx->comp, UCS_ERR_CANCELED); + uct_tcp_ep_zcopy_completed(ep, ctx->comp, status); uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset); } ucs_queue_for_each_extract(put_comp, &ep->put_comp_q, elem, 1) { - uct_invoke_completion(put_comp->comp, UCS_ERR_CANCELED); + uct_invoke_completion(put_comp->comp, status); ucs_mpool_put_inline(put_comp); } } @@ -403,7 +406,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_tcp_ep_t) } uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_CAPS); - uct_tcp_ep_purge(self); + uct_tcp_ep_purge(self, UCS_ERR_CANCELED); if (self->flags & UCT_TCP_EP_FLAG_FAILED) { /* a failed EP callback can be still scheduled on the UCT worker, @@ -447,7 +450,7 @@ void uct_tcp_ep_destroy(uct_ep_h tl_ep) /* remove TX capability, but still will be able to receive data */ uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_FLAG_CTX_TYPE_TX); /* purge all outstanding operations (GET/PUT Zcopy, flush operations) */ - uct_tcp_ep_purge(ep); + uct_tcp_ep_purge(ep, UCS_ERR_CANCELED); uct_tcp_cm_insert_ep(iface, ep); } else { uct_tcp_ep_destroy_internal(tl_ep); @@ -947,7 +950,6 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status { uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_tcp_iface_t); - uct_tcp_ep_zcopy_tx_t *ctx; ucs_debug("tcp_ep %p: remote disconnected", ep); @@ -957,12 +959,7 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status ep->flags &= ~UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK; } - if (ep->flags & UCT_TCP_EP_FLAG_ZCOPY_TX) { - /* There is ongoing AM/PUT Zcopy operation, need to notify - * the user about the error */ - ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf; - uct_tcp_ep_zcopy_completed(ep, ctx->comp, status); - } + uct_tcp_ep_purge(ep, status); if (ep->flags & UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK) { /* if the EP is waiting for the acknowledgment of the started @@ -970,8 +967,6 @@ static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep, ucs_status_t status uct_tcp_iface_outstanding_dec(iface); ep->flags &= ~UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK; } - - uct_tcp_ep_tx_completed(ep, ep->tx.length - ep->tx.offset); } uct_tcp_ep_set_failed(ep);