Skip to content

Commit

Permalink
Merge pull request #7276 from yosefe/topic/uct-keepalive-fixes-v1.11.x
Browse files Browse the repository at this point in the history
Port keepalive fixes to v1.11.x branch
  • Loading branch information
yosefe authored Aug 22, 2021
2 parents 5d8c109 + 87b0389 commit 4e68fa1
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 71 deletions.
4 changes: 3 additions & 1 deletion contrib/test_jenkins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,9 @@ run_ucp_hello() {
export UCX_KEEPALIVE_INTERVAL=1s
export UCX_KEEPALIVE_NUM_EPS=10
export UCX_LOG_LEVEL=info
export UCX_MM_ERROR_HANDLING=y

for tls in all tcp,cuda
for tls in all tcp,cuda shm,cuda
do
export UCX_TLS=${tls}
for test_mode in -w -f -b -erecv -esend -ekeepalive
Expand All @@ -522,6 +523,7 @@ run_ucp_hello() {
unset UCX_KEEPALIVE_NUM_EPS
unset UCX_LOG_LEVEL
unset UCX_TLS
unset UCX_MM_ERROR_HANDLING
}

#
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2928,6 +2928,9 @@ int ucp_ep_do_keepalive(ucp_ep_h ep)
ucs_assert((rsc_index != UCP_NULL_RESOURCE) ||
(lane == ucp_ep_get_cm_lane(ep)));

ucs_trace("ep %p: do keepalive on lane[%d]=%p ep->flags=0x%x", ep, lane,
ep->uct_eps[lane], ep->flags);

status = ucp_ep_do_uct_ep_keepalive(ep, ep->uct_eps[lane], rsc_index, 0,
NULL);
if (status == UCS_ERR_NO_RESOURCE) {
Expand Down
8 changes: 4 additions & 4 deletions src/ucs/sys/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx)
}

ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,
ucs_time_t *filetime)
struct timespec *ts)
{
struct stat stat_buf;
int res;
Expand All @@ -1475,13 +1475,13 @@ ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,

switch (type) {
case UCS_SYS_FILE_TIME_CTIME:
*filetime = ucs_time_from_timespec(&stat_buf.st_ctim);
*ts = stat_buf.st_ctim;
return UCS_OK;
case UCS_SYS_FILE_TIME_ATIME:
*filetime = ucs_time_from_timespec(&stat_buf.st_atim);
*ts = stat_buf.st_atim;
return UCS_OK;
case UCS_SYS_FILE_TIME_MTIME:
*filetime = ucs_time_from_timespec(&stat_buf.st_mtim);
*ts = stat_buf.st_mtim;
return UCS_OK;
default:
return UCS_ERR_INVALID_PARAM;
Expand Down
8 changes: 4 additions & 4 deletions src/ucs/sys/sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,14 +599,14 @@ ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx);
/**
* Get file time
*
* @param [in] name File name
* @param [in] type Type of file time information
* @param [out] ctime File creation time
* @param [in] name File name
* @param [in] type Type of file time information
* @param [out] ts File time information
*
* @return UCS_OK if file is found and got information.
*/
ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,
ucs_time_t *time);
struct timespec *ts);

END_C_DECLS

Expand Down
9 changes: 0 additions & 9 deletions src/ucs/time/time.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ static inline ucs_time_t ucs_time_from_sec(double sec)
}


/**
* Convert POSIX timespec to UCS time units.
*/
static inline ucs_time_t ucs_time_from_timespec(struct timespec *ts)
{
return ucs_time_from_sec((double)ts->tv_sec +
(double)ts->tv_nsec / UCS_NSEC_PER_SEC);
}

/**
* Convert seconds to UCS time units.
*/
Expand Down
101 changes: 81 additions & 20 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
#include <ucs/vfs/base/vfs_obj.h>


typedef struct uct_base_ep_error_handle_info {
uct_ep_h ep;
ucs_status_t status;
} uct_base_ep_error_handle_info_t;


#ifdef ENABLE_STATS
static ucs_stats_class_t uct_ep_stats_class = {
.name = "uct_ep",
Expand Down Expand Up @@ -162,7 +168,7 @@ void uct_iface_set_async_event_params(const uct_iface_params_t *params,
void **event_arg)
{
*event_cb = UCT_IFACE_PARAM_VALUE(params, async_event_cb, ASYNC_EVENT_CB,
NULL);
NULL);
*event_arg = UCT_IFACE_PARAM_VALUE(params, async_event_arg, ASYNC_EVENT_ARG,
NULL);
}
Expand Down Expand Up @@ -580,6 +586,33 @@ UCS_CLASS_CLEANUP_FUNC(uct_ep_t)

UCS_CLASS_DEFINE(uct_ep_t, void);

static unsigned uct_iface_ep_error_handle_progress(void *arg)
{
uct_base_ep_error_handle_info_t *err_info = arg;
uct_base_iface_t *iface;

iface = ucs_derived_of(err_info->ep->iface, uct_base_iface_t);
iface->err_handler(iface->err_handler_arg, err_info->ep, err_info->status);
ucs_free(err_info);
return 1;
}

static int
uct_iface_ep_error_handle_progress_remove(const ucs_callbackq_elem_t *elem,
void *arg)
{
uct_base_ep_error_handle_info_t *err_info = elem->arg;
uct_base_ep_t *ep = arg;

if ((elem->cb == uct_iface_ep_error_handle_progress) &&
(err_info->ep == &ep->super)) {
ucs_free(err_info);
return 1;
}

return 0;
}

UCS_CLASS_INIT_FUNC(uct_base_ep_t, uct_base_iface_t *iface)
{
UCS_CLASS_CALL_SUPER_INIT(uct_ep_t, &iface->super);
Expand All @@ -590,6 +623,11 @@ UCS_CLASS_INIT_FUNC(uct_base_ep_t, uct_base_iface_t *iface)

static UCS_CLASS_CLEANUP_FUNC(uct_base_ep_t)
{
uct_base_iface_t *iface = ucs_derived_of(self->super.iface,
uct_base_iface_t);

ucs_callbackq_remove_if(&iface->worker->super.progress_q,
uct_iface_ep_error_handle_progress_remove, self);
UCS_STATS_NODE_FREE(self->stats);
}

Expand Down Expand Up @@ -698,7 +736,6 @@ int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid)
ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p)
{
uct_keepalive_info_t *ka;
ucs_time_t start_time;
ucs_status_t status;
int proc_len;

Expand All @@ -719,15 +756,13 @@ ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p)
uct_ep_get_process_proc_dir(ka->proc, proc_len + 1, pid);

status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME,
&start_time);
&ka->start_time);
if (status != UCS_OK) {
ucs_error("failed to get process start time");
goto err_free_ka;
}

ka->start_time = start_time;
*ka_p = ka;

*ka_p = ka;
return UCS_OK;

err_free_ka:
Expand All @@ -736,30 +771,56 @@ ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p)
return status;
}

ucs_status_t
uct_ep_keepalive_check(uct_ep_h tl_ep, uct_keepalive_info_t **ka, pid_t pid,
unsigned flags, uct_completion_t *comp)
static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status)
{
uct_base_iface_t *iface = ucs_derived_of(ep->iface, uct_base_iface_t);
uct_base_ep_error_handle_info_t *err_info;

if (iface->err_handler == NULL) {
ucs_diag("ep %p: unhandled error %s", ep, ucs_status_string(status));
return UCS_OK;
}

err_info = ucs_malloc(sizeof(*err_info), "uct_base_ep_err");
if (err_info == NULL) {
return UCS_ERR_NO_MEMORY;
}

err_info->ep = ep;
err_info->status = status;
ucs_callbackq_add_safe(&iface->worker->super.progress_q,
uct_iface_ep_error_handle_progress, err_info,
UCS_CALLBACKQ_FLAG_ONESHOT);
return UCS_OK;
}

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p,
pid_t pid, unsigned flags,
uct_completion_t *comp)
{
struct timespec create_time;
uct_keepalive_info_t *ka;
ucs_status_t status;
ucs_time_t create_time;

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);

if (ucs_unlikely(*ka == NULL)) {
status = uct_ep_keepalive_create(pid, ka);
if (status != UCS_OK) {
return uct_iface_handle_ep_err(tl_ep->iface, tl_ep, status);
}
if (*ka_p == NULL) {
status = uct_ep_keepalive_create(pid, ka_p);
} else {
status = ucs_sys_get_file_time((*ka)->proc, UCS_SYS_FILE_TIME_CTIME,
ka = *ka_p;
status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME,
&create_time);
if (ucs_unlikely((status != UCS_OK) ||
((*ka)->start_time != create_time))) {
return uct_iface_handle_ep_err(tl_ep->iface, tl_ep,
UCS_ERR_ENDPOINT_TIMEOUT);
if ((status != UCS_OK) ||
(ka->start_time.tv_sec != create_time.tv_sec) ||
(ka->start_time.tv_nsec != create_time.tv_nsec)) {
status = UCS_ERR_ENDPOINT_TIMEOUT;
}
}

if (status != UCS_OK) {
return uct_iface_schedule_ep_err(ep, status);
}

return UCS_OK;
}

Expand Down
10 changes: 5 additions & 5 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ typedef struct uct_failed_iface {
* Keepalive info used by EP
*/
typedef struct uct_keepalive_info {
ucs_time_t start_time; /* Process start time */
char proc[]; /* Process owner proc dir */
struct timespec start_time; /* Process start time */
char proc[]; /* Process owner proc dir */
} uct_keepalive_info_t;


Expand Down Expand Up @@ -842,9 +842,9 @@ int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid);

ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p);

ucs_status_t
uct_ep_keepalive_check(uct_ep_h tl_ep, uct_keepalive_info_t **ka, pid_t pid,
unsigned flags, uct_completion_t *comp);
ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p,
pid_t pid, unsigned flags,
uct_completion_t *comp);

void uct_ep_set_iface(uct_ep_h ep, uct_iface_t *iface);

Expand Down
60 changes: 32 additions & 28 deletions test/gtest/uct/test_peer_failure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,66 +473,70 @@ UCS_TEST_SKIP_COND_P(test_uct_peer_failure_multiple, test,

UCT_INSTANTIATE_TEST_CASE(test_uct_peer_failure_multiple)

class test_uct_keepalive : public ucs::test {
class test_uct_keepalive : public uct_test {
public:
test_uct_keepalive()
test_uct_keepalive() :
m_ka(NULL), m_pid(getpid()), m_entity(NULL), m_err_handler_count(0)
{
m_ka = NULL;
m_pid = getpid();
}

void init()
{
m_err_handler_count = 0;

ASSERT_UCS_OK(uct_ep_keepalive_create(m_pid, &m_ka));

m_entity = create_entity(0, err_handler_cb);
m_entity->connect(0, *m_entity, 0);
m_entities.push_back(m_entity);
}

void cleanup()
{
m_entities.clear();
ucs_free(m_ka);
}

static ucs_status_t
err_handler_cb(void *arg, uct_ep_h ep, ucs_status_t status)
{
m_err_handler_count++;
return status;
test_uct_keepalive *self = reinterpret_cast<test_uct_keepalive*>(arg);
self->m_err_handler_count++;
return UCS_OK;
}

protected:
void do_keepalive()
{
ucs_status_t status = uct_ep_keepalive_check(m_entity->ep(0), &m_ka,
m_pid, 0, NULL);
EXPECT_UCS_OK(status);
}

uct_keepalive_info_t *m_ka;
pid_t m_pid;
static unsigned m_err_handler_count;
entity *m_entity;
unsigned m_err_handler_count;
};


unsigned test_uct_keepalive::m_err_handler_count = 0;


UCS_TEST_F(test_uct_keepalive, ep_check)
UCS_TEST_P(test_uct_keepalive, ep_check)
{
uct_base_iface_t iface = {};
uct_ep_t ep = {};

iface.err_handler = err_handler_cb;
iface.err_handler_arg = &m_err_handler_count;
ep.iface = &iface.super;

for (unsigned i = 0; i < 10; ++i) {
ucs_status_t status = uct_ep_keepalive_check(&ep, &m_ka, m_pid, 0,
NULL);
EXPECT_UCS_OK(status);
do_keepalive();
}
EXPECT_EQ(0u, m_err_handler_count);

/* change start time saved in KA to force an error from EP check */
m_ka->start_time--;
m_ka->start_time.tv_sec--;

ucs_status_t status = uct_ep_keepalive_check(&ep, &m_ka, m_pid, 0, NULL);
EXPECT_EQ(UCS_ERR_ENDPOINT_TIMEOUT, status);
do_keepalive();
EXPECT_EQ(0u, m_err_handler_count);

progress();
EXPECT_EQ(1u, m_err_handler_count);
}

// Need to instantiate only on one transport
_UCT_INSTANTIATE_TEST_CASE(test_uct_keepalive, posix)


class test_uct_peer_failure_keepalive : public test_uct_peer_failure
{
Expand Down Expand Up @@ -568,7 +572,7 @@ class test_uct_peer_failure_keepalive : public test_uct_peer_failure
}

if (ka_info != NULL) {
ka_info->start_time--;
ka_info->start_time.tv_sec--;
}

test_uct_peer_failure::kill_receiver();
Expand Down

0 comments on commit 4e68fa1

Please sign in to comment.