From 23bb0ba295434c4540b2c70511796c651b13170e Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Wed, 18 Aug 2021 15:14:31 +0300 Subject: [PATCH 1/2] UCT/TEST: Keepalive to call error callback from progress queue When doing keepalive for shared memory using uct_ep_keepalive_check, and it fails, need to call error callback. But cannot do it in same context, since error callback has to be called from uct_worker_progress() context. So instead of calling err_handler directly, schedule an operation on the progress queue. Without this fix, UCP error callback was called in keepalive context and changed the ep state flags to FAILED unexpectedly, which caused an assertion failure. --- contrib/test_jenkins.sh | 4 +- src/ucp/core/ucp_ep.c | 3 ++ src/uct/base/uct_iface.c | 84 ++++++++++++++++++++++++----- src/uct/base/uct_iface.h | 6 +-- test/gtest/uct/test_peer_failure.cc | 56 ++++++++++--------- 5 files changed, 111 insertions(+), 42 deletions(-) diff --git a/contrib/test_jenkins.sh b/contrib/test_jenkins.sh index 6592d9819ca..01a0a46dcb9 100755 --- a/contrib/test_jenkins.sh +++ b/contrib/test_jenkins.sh @@ -503,8 +503,9 @@ run_ucp_hello() { export UCX_KEEPALIVE_INTERVAL=1s export UCX_KEEPALIVE_NUM_EPS=10 export UCX_LOG_LEVEL=info + export UCX_MM_ERROR_HANDLING=y - for tls in all tcp,cuda + for tls in all tcp,cuda shm,cuda do export UCX_TLS=${tls} for test_mode in -w -f -b -erecv -esend -ekeepalive @@ -522,6 +523,7 @@ run_ucp_hello() { unset UCX_KEEPALIVE_NUM_EPS unset UCX_LOG_LEVEL unset UCX_TLS + unset UCX_MM_ERROR_HANDLING } # diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 48e6e08e635..8eb1fe97886 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -2928,6 +2928,9 @@ int ucp_ep_do_keepalive(ucp_ep_h ep) ucs_assert((rsc_index != UCP_NULL_RESOURCE) || (lane == ucp_ep_get_cm_lane(ep))); + ucs_trace("ep %p: do keepalive on lane[%d]=%p ep->flags=0x%x", ep, lane, + ep->uct_eps[lane], ep->flags); + status = ucp_ep_do_uct_ep_keepalive(ep, ep->uct_eps[lane], rsc_index, 0, NULL); if (status == UCS_ERR_NO_RESOURCE) { diff --git a/src/uct/base/uct_iface.c b/src/uct/base/uct_iface.c index bcedd8474a7..d0d5da013ef 100644 --- a/src/uct/base/uct_iface.c +++ b/src/uct/base/uct_iface.c @@ -22,6 +22,12 @@ #include +typedef struct uct_base_ep_error_handle_info { + uct_ep_h ep; + ucs_status_t status; +} uct_base_ep_error_handle_info_t; + + #ifdef ENABLE_STATS static ucs_stats_class_t uct_ep_stats_class = { .name = "uct_ep", @@ -162,7 +168,7 @@ void uct_iface_set_async_event_params(const uct_iface_params_t *params, void **event_arg) { *event_cb = UCT_IFACE_PARAM_VALUE(params, async_event_cb, ASYNC_EVENT_CB, - NULL); + NULL); *event_arg = UCT_IFACE_PARAM_VALUE(params, async_event_arg, ASYNC_EVENT_ARG, NULL); } @@ -580,6 +586,33 @@ UCS_CLASS_CLEANUP_FUNC(uct_ep_t) UCS_CLASS_DEFINE(uct_ep_t, void); +static unsigned uct_iface_ep_error_handle_progress(void *arg) +{ + uct_base_ep_error_handle_info_t *err_info = arg; + uct_base_iface_t *iface; + + iface = ucs_derived_of(err_info->ep->iface, uct_base_iface_t); + iface->err_handler(iface->err_handler_arg, err_info->ep, err_info->status); + ucs_free(err_info); + return 1; +} + +static int +uct_iface_ep_error_handle_progress_remove(const ucs_callbackq_elem_t *elem, + void *arg) +{ + uct_base_ep_error_handle_info_t *err_info = elem->arg; + uct_base_ep_t *ep = arg; + + if ((elem->cb == uct_iface_ep_error_handle_progress) && + (err_info->ep == &ep->super)) { + ucs_free(err_info); + return 1; + } + + return 0; +} + UCS_CLASS_INIT_FUNC(uct_base_ep_t, uct_base_iface_t *iface) { UCS_CLASS_CALL_SUPER_INIT(uct_ep_t, &iface->super); @@ -590,6 +623,11 @@ UCS_CLASS_INIT_FUNC(uct_base_ep_t, uct_base_iface_t *iface) static UCS_CLASS_CLEANUP_FUNC(uct_base_ep_t) { + uct_base_iface_t *iface = ucs_derived_of(self->super.iface, + uct_base_iface_t); + + ucs_callbackq_remove_if(&iface->worker->super.progress_q, + uct_iface_ep_error_handle_progress_remove, self); UCS_STATS_NODE_FREE(self->stats); } @@ -736,30 +774,52 @@ ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p) return status; } -ucs_status_t -uct_ep_keepalive_check(uct_ep_h tl_ep, uct_keepalive_info_t **ka, pid_t pid, - unsigned flags, uct_completion_t *comp) +static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status) +{ + uct_base_iface_t *iface = ucs_derived_of(ep->iface, uct_base_iface_t); + uct_base_ep_error_handle_info_t *err_info; + + if (iface->err_handler == NULL) { + ucs_diag("ep %p: unhandled error %s", ep, ucs_status_string(status)); + return UCS_OK; + } + + err_info = ucs_malloc(sizeof(*err_info), "uct_base_ep_err"); + if (err_info == NULL) { + return UCS_ERR_NO_MEMORY; + } + + err_info->ep = ep; + err_info->status = status; + ucs_callbackq_add_safe(&iface->worker->super.progress_q, + uct_iface_ep_error_handle_progress, err_info, + UCS_CALLBACKQ_FLAG_ONESHOT); + return UCS_OK; +} + +ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka, + pid_t pid, unsigned flags, + uct_completion_t *comp) { ucs_status_t status; ucs_time_t create_time; UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); - if (ucs_unlikely(*ka == NULL)) { + if (*ka == NULL) { status = uct_ep_keepalive_create(pid, ka); - if (status != UCS_OK) { - return uct_iface_handle_ep_err(tl_ep->iface, tl_ep, status); - } } else { status = ucs_sys_get_file_time((*ka)->proc, UCS_SYS_FILE_TIME_CTIME, &create_time); - if (ucs_unlikely((status != UCS_OK) || - ((*ka)->start_time != create_time))) { - return uct_iface_handle_ep_err(tl_ep->iface, tl_ep, - UCS_ERR_ENDPOINT_TIMEOUT); + if ((status != UCS_OK) || ((*ka)->start_time != create_time)) { + status = UCS_ERR_ENDPOINT_TIMEOUT; } } + if (status != UCS_OK) { + return uct_iface_schedule_ep_err(ep, status); + } + return UCS_OK; } diff --git a/src/uct/base/uct_iface.h b/src/uct/base/uct_iface.h index 67b3353b69a..e86eee101d5 100644 --- a/src/uct/base/uct_iface.h +++ b/src/uct/base/uct_iface.h @@ -842,9 +842,9 @@ int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid); ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p); -ucs_status_t -uct_ep_keepalive_check(uct_ep_h tl_ep, uct_keepalive_info_t **ka, pid_t pid, - unsigned flags, uct_completion_t *comp); +ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka, + pid_t pid, unsigned flags, + uct_completion_t *comp); void uct_ep_set_iface(uct_ep_h ep, uct_iface_t *iface); diff --git a/test/gtest/uct/test_peer_failure.cc b/test/gtest/uct/test_peer_failure.cc index 06bc260232c..e264c1cbad4 100644 --- a/test/gtest/uct/test_peer_failure.cc +++ b/test/gtest/uct/test_peer_failure.cc @@ -473,66 +473,70 @@ UCS_TEST_SKIP_COND_P(test_uct_peer_failure_multiple, test, UCT_INSTANTIATE_TEST_CASE(test_uct_peer_failure_multiple) -class test_uct_keepalive : public ucs::test { +class test_uct_keepalive : public uct_test { public: - test_uct_keepalive() + test_uct_keepalive() : + m_ka(NULL), m_pid(getpid()), m_entity(NULL), m_err_handler_count(0) { - m_ka = NULL; - m_pid = getpid(); } void init() { - m_err_handler_count = 0; - ASSERT_UCS_OK(uct_ep_keepalive_create(m_pid, &m_ka)); + + m_entity = create_entity(0, err_handler_cb); + m_entity->connect(0, *m_entity, 0); + m_entities.push_back(m_entity); } void cleanup() { + m_entities.clear(); ucs_free(m_ka); } static ucs_status_t err_handler_cb(void *arg, uct_ep_h ep, ucs_status_t status) { - m_err_handler_count++; - return status; + test_uct_keepalive *self = reinterpret_cast(arg); + self->m_err_handler_count++; + return UCS_OK; } protected: + void do_keepalive() + { + ucs_status_t status = uct_ep_keepalive_check(m_entity->ep(0), &m_ka, + m_pid, 0, NULL); + EXPECT_UCS_OK(status); + } + uct_keepalive_info_t *m_ka; pid_t m_pid; - static unsigned m_err_handler_count; + entity *m_entity; + unsigned m_err_handler_count; }; - -unsigned test_uct_keepalive::m_err_handler_count = 0; - - -UCS_TEST_F(test_uct_keepalive, ep_check) +UCS_TEST_P(test_uct_keepalive, ep_check) { - uct_base_iface_t iface = {}; - uct_ep_t ep = {}; - - iface.err_handler = err_handler_cb; - iface.err_handler_arg = &m_err_handler_count; - ep.iface = &iface.super; - for (unsigned i = 0; i < 10; ++i) { - ucs_status_t status = uct_ep_keepalive_check(&ep, &m_ka, m_pid, 0, - NULL); - EXPECT_UCS_OK(status); + do_keepalive(); } + EXPECT_EQ(0u, m_err_handler_count); /* change start time saved in KA to force an error from EP check */ m_ka->start_time--; - ucs_status_t status = uct_ep_keepalive_check(&ep, &m_ka, m_pid, 0, NULL); - EXPECT_EQ(UCS_ERR_ENDPOINT_TIMEOUT, status); + do_keepalive(); + EXPECT_EQ(0u, m_err_handler_count); + + progress(); EXPECT_EQ(1u, m_err_handler_count); } +// Need to instantiate only on one transport +_UCT_INSTANTIATE_TEST_CASE(test_uct_keepalive, posix) + class test_uct_peer_failure_keepalive : public test_uct_peer_failure { From 87b038942b691fdaebadc2dadd7458d4aadb9fe3 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Fri, 20 Aug 2021 21:28:26 +0300 Subject: [PATCH 2/2] UCT/UCS/KEEPALIVE: Fix overflow when recording file create time Don't convert file time to CPU time. File timestamp is seconds since 1970. We should not convert it to ucs_time_t since it can overflow 64 bits. --- src/ucs/sys/sys.c | 8 ++++---- src/ucs/sys/sys.h | 8 ++++---- src/ucs/time/time.h | 9 --------- src/uct/base/uct_iface.c | 23 ++++++++++++----------- src/uct/base/uct_iface.h | 6 +++--- test/gtest/uct/test_peer_failure.cc | 4 ++-- 6 files changed, 25 insertions(+), 33 deletions(-) diff --git a/src/ucs/sys/sys.c b/src/ucs/sys/sys.c index ac4d0d32206..59836aaa51c 100644 --- a/src/ucs/sys/sys.c +++ b/src/ucs/sys/sys.c @@ -1463,7 +1463,7 @@ ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx) } ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type, - ucs_time_t *filetime) + struct timespec *ts) { struct stat stat_buf; int res; @@ -1475,13 +1475,13 @@ ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type, switch (type) { case UCS_SYS_FILE_TIME_CTIME: - *filetime = ucs_time_from_timespec(&stat_buf.st_ctim); + *ts = stat_buf.st_ctim; return UCS_OK; case UCS_SYS_FILE_TIME_ATIME: - *filetime = ucs_time_from_timespec(&stat_buf.st_atim); + *ts = stat_buf.st_atim; return UCS_OK; case UCS_SYS_FILE_TIME_MTIME: - *filetime = ucs_time_from_timespec(&stat_buf.st_mtim); + *ts = stat_buf.st_mtim; return UCS_OK; default: return UCS_ERR_INVALID_PARAM; diff --git a/src/ucs/sys/sys.h b/src/ucs/sys/sys.h index 8fc7bd94e7d..fddf3b2def4 100644 --- a/src/ucs/sys/sys.h +++ b/src/ucs/sys/sys.h @@ -599,14 +599,14 @@ ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx); /** * Get file time * - * @param [in] name File name - * @param [in] type Type of file time information - * @param [out] ctime File creation time + * @param [in] name File name + * @param [in] type Type of file time information + * @param [out] ts File time information * * @return UCS_OK if file is found and got information. */ ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type, - ucs_time_t *time); + struct timespec *ts); END_C_DECLS diff --git a/src/ucs/time/time.h b/src/ucs/time/time.h index 775c728ef72..11937541a99 100644 --- a/src/ucs/time/time.h +++ b/src/ucs/time/time.h @@ -76,15 +76,6 @@ static inline ucs_time_t ucs_time_from_sec(double sec) } -/** - * Convert POSIX timespec to UCS time units. - */ -static inline ucs_time_t ucs_time_from_timespec(struct timespec *ts) -{ - return ucs_time_from_sec((double)ts->tv_sec + - (double)ts->tv_nsec / UCS_NSEC_PER_SEC); -} - /** * Convert seconds to UCS time units. */ diff --git a/src/uct/base/uct_iface.c b/src/uct/base/uct_iface.c index d0d5da013ef..6a193c69971 100644 --- a/src/uct/base/uct_iface.c +++ b/src/uct/base/uct_iface.c @@ -736,7 +736,6 @@ int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid) ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p) { uct_keepalive_info_t *ka; - ucs_time_t start_time; ucs_status_t status; int proc_len; @@ -757,15 +756,13 @@ ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p) uct_ep_get_process_proc_dir(ka->proc, proc_len + 1, pid); status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME, - &start_time); + &ka->start_time); if (status != UCS_OK) { ucs_error("failed to get process start time"); goto err_free_ka; } - ka->start_time = start_time; - *ka_p = ka; - + *ka_p = ka; return UCS_OK; err_free_ka: @@ -797,21 +794,25 @@ static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status) return UCS_OK; } -ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka, +ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p, pid_t pid, unsigned flags, uct_completion_t *comp) { + struct timespec create_time; + uct_keepalive_info_t *ka; ucs_status_t status; - ucs_time_t create_time; UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); - if (*ka == NULL) { - status = uct_ep_keepalive_create(pid, ka); + if (*ka_p == NULL) { + status = uct_ep_keepalive_create(pid, ka_p); } else { - status = ucs_sys_get_file_time((*ka)->proc, UCS_SYS_FILE_TIME_CTIME, + ka = *ka_p; + status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME, &create_time); - if ((status != UCS_OK) || ((*ka)->start_time != create_time)) { + if ((status != UCS_OK) || + (ka->start_time.tv_sec != create_time.tv_sec) || + (ka->start_time.tv_nsec != create_time.tv_nsec)) { status = UCS_ERR_ENDPOINT_TIMEOUT; } } diff --git a/src/uct/base/uct_iface.h b/src/uct/base/uct_iface.h index e86eee101d5..2520c6df2b9 100644 --- a/src/uct/base/uct_iface.h +++ b/src/uct/base/uct_iface.h @@ -290,8 +290,8 @@ typedef struct uct_failed_iface { * Keepalive info used by EP */ typedef struct uct_keepalive_info { - ucs_time_t start_time; /* Process start time */ - char proc[]; /* Process owner proc dir */ + struct timespec start_time; /* Process start time */ + char proc[]; /* Process owner proc dir */ } uct_keepalive_info_t; @@ -842,7 +842,7 @@ int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid); ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p); -ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka, +ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p, pid_t pid, unsigned flags, uct_completion_t *comp); diff --git a/test/gtest/uct/test_peer_failure.cc b/test/gtest/uct/test_peer_failure.cc index e264c1cbad4..88bd1912401 100644 --- a/test/gtest/uct/test_peer_failure.cc +++ b/test/gtest/uct/test_peer_failure.cc @@ -525,7 +525,7 @@ UCS_TEST_P(test_uct_keepalive, ep_check) EXPECT_EQ(0u, m_err_handler_count); /* change start time saved in KA to force an error from EP check */ - m_ka->start_time--; + m_ka->start_time.tv_sec--; do_keepalive(); EXPECT_EQ(0u, m_err_handler_count); @@ -572,7 +572,7 @@ class test_uct_peer_failure_keepalive : public test_uct_peer_failure } if (ka_info != NULL) { - ka_info->start_time--; + ka_info->start_time.tv_sec--; } test_uct_peer_failure::kill_receiver();