diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 8eb1fe97886..e871997daa5 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1044,11 +1044,12 @@ static void ucp_ep_set_lanes_failed(ucp_ep_h ep, uct_ep_h *uct_eps) } } -void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, - ucs_status_t status) +ucs_status_t +ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status) { UCS_STRING_BUFFER_ONSTACK(lane_info_strb, 64); ucp_ep_ext_control_t *ep_ext_control = ucp_ep_ext_control(ucp_ep); + ucp_err_handling_mode_t err_mode; ucs_log_level_t log_level; ucp_request_t *close_req; @@ -1066,7 +1067,7 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, /* set endpoint to failed to prevent wireup_ep switch */ if (ucp_ep->flags & UCP_EP_FLAG_FAILED) { - return; + return UCS_OK; } /* The EP can be closed from last completion callback */ @@ -1083,19 +1084,25 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, close_req->send.flush.uct_flags |= UCT_FLUSH_FLAG_CANCEL; ucp_ep_local_disconnect_progress(close_req); } + return UCS_OK; } else if (ep_ext_control->err_cb == NULL) { - /* Do not print error if connection reset by remote peer since it - * can be part of user level close protocol */ - log_level = (status == UCS_ERR_CONNECTION_RESET) ? - UCS_LOG_LEVEL_DIAG : UCS_LOG_LEVEL_ERROR; + /* Print error if user requested error handling support but did not + install a valid error handling callback */ + err_mode = ucp_ep_config(ucp_ep)->key.err_mode; + log_level = (err_mode == UCP_ERR_HANDLING_MODE_NONE) ? + UCS_LOG_LEVEL_DIAG : + UCS_LOG_LEVEL_ERROR; ucp_ep_get_lane_info_str(ucp_ep, lane, &lane_info_strb); - ucs_log(log_level, "ep %p: error '%s' on %s will not be handled" + ucs_log(log_level, + "ep %p: error '%s' on %s will not be handled" " since no error callback is installed", ucp_ep, ucs_status_string(status), ucs_string_buffer_cstr(&lane_info_strb)); + return UCS_ERR_UNSUPPORTED; } else { ucp_ep_invoke_err_cb(ucp_ep, status); + return UCS_OK; } } else if (ucp_ep->flags & (UCP_EP_FLAG_INTERNAL | UCP_EP_FLAG_CLOSED)) { /* No additional actions are required, this is already closed EP or @@ -1103,10 +1110,12 @@ void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, * So, close operation was already scheduled, this EP will be deleted * after all lanes will be discarded successfully */ ucs_debug("ep %p: detected peer failure on internal endpoint", ucp_ep); + return UCS_OK; } else { ucs_debug("ep %p: destroy endpoint which is not exposed to a user due" " to peer failure", ucp_ep); ucp_ep_disconnected(ucp_ep, 1); + return UCS_OK; } } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index e1eb96ce9b4..058e9434ade 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -582,8 +582,8 @@ void ucp_ep_disconnected(ucp_ep_h ep, int force); void ucp_ep_destroy_internal(ucp_ep_h ep); -void ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, - ucs_status_t status); +ucs_status_t +ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status); void ucp_ep_set_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status); diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 06ff34398d3..d1990587e5d 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -427,14 +427,14 @@ void ucp_worker_signal_internal(ucp_worker_h worker) } } -static void +static ucs_status_t ucp_worker_iface_handle_uct_ep_failure(ucp_ep_h ucp_ep, ucp_lane_index_t lane, uct_ep_h uct_ep, ucs_status_t status) { ucp_wireup_ep_t *wireup_ep; if (ucp_ep->flags & UCP_EP_FLAG_FAILED) { - return; + return UCS_OK; } wireup_ep = ucp_wireup_ep(ucp_ep->uct_eps[lane]); @@ -443,14 +443,13 @@ ucp_worker_iface_handle_uct_ep_failure(ucp_ep_h ucp_ep, ucp_lane_index_t lane, !ucp_ep_is_local_connected(ucp_ep)) { /* Failure on NON-AUX EP or failure on AUX EP before it sent its address * means failure on the UCP EP */ - ucp_ep_set_failed(ucp_ep, lane, status); - return; + return ucp_ep_set_failed(ucp_ep, lane, status); } if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) { /* @ref ucp_wireup_ep_progress was scheduled, wireup ep and its * pending requests have to be handled there */ - return; + return UCS_OK; } /** @@ -462,6 +461,7 @@ ucp_worker_iface_handle_uct_ep_failure(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucp_wireup_ep_discard_aux_ep(wireup_ep, UCT_FLUSH_FLAG_CANCEL, ucp_destroyed_ep_pending_purge, ucp_ep); ucp_wireup_remote_connected(ucp_ep); + return UCS_OK; } static ucp_ep_h ucp_worker_find_lane(ucs_list_link_t *ep_list, uct_ep_h uct_ep, @@ -515,8 +515,8 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) } } - ucp_worker_iface_handle_uct_ep_failure(ucp_ep, lane, uct_ep, status); - status = UCS_OK; + status = ucp_worker_iface_handle_uct_ep_failure(ucp_ep, lane, uct_ep, + status); out: UCS_ASYNC_UNBLOCK(&worker->async);