Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/SM/CUDA: Fix common intra-node keepalive protocol - v1.12.x #7841

Merged
merged 2 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
* Fixed fallback to PUT pipeline in rendezvous protocol
* Reduced default value of keep-alive interval to 20 seconds
#### UCT
* Fixed keep-alive protocol for intra-node transports (sm, cuda)
* Fixed deadlock in TCP
* Suppressed EHOSTUNREACH error in TCP sockcm
* Restricted connecting loop-back to other devices in TCP
Expand Down
61 changes: 35 additions & 26 deletions src/ucs/sys/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#define UCS_PROCESS_NS_DIR "/proc/self/ns"
#define UCS_PROCESS_BOOTID_FILE "/proc/sys/kernel/random/boot_id"
#define UCS_PROCESS_BOOTID_FMT "%x-%4hx-%4hx-%4hx-%2hhx%2hhx%2hhx%2hhx%2hhx%2hhx"
#define UCS_PROCCESS_STAT_FMT "/proc/%d/stat"
#define UCS_PROCESS_NS_FIRST 0xF0000000U
#define UCS_PROCESS_NS_NET_DFLT 0xF0000080U

Expand Down Expand Up @@ -1462,32 +1463,6 @@ ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx)
return ucs_sys_readdir(task_dir, &ucs_sys_enum_threads_cb, &param);
}

ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,
struct timespec *ts)
{
struct stat stat_buf;
int res;

res = stat(name, &stat_buf);
if (res != 0) {
return UCS_ERR_IO_ERROR; /* failed to get file info */
}

switch (type) {
case UCS_SYS_FILE_TIME_CTIME:
*ts = stat_buf.st_ctim;
return UCS_OK;
case UCS_SYS_FILE_TIME_ATIME:
*ts = stat_buf.st_atim;
return UCS_OK;
case UCS_SYS_FILE_TIME_MTIME:
*ts = stat_buf.st_mtim;
return UCS_OK;
default:
return UCS_ERR_INVALID_PARAM;
}
}

ucs_status_t ucs_sys_check_fd_limit_per_process()
{
int fd;
Expand Down Expand Up @@ -1541,3 +1516,37 @@ long ucs_sys_get_num_cpus()

return num_cpus;
}

unsigned long ucs_sys_get_proc_create_time(pid_t pid)
{
char stat[1024];
char *start_str;
ssize_t size;
unsigned long stime;
int res;

size = ucs_read_file_str(stat, sizeof(stat), 1, UCS_PROCCESS_STAT_FMT, pid);
if (size < 0) {
goto err;
}

/* Start sscanf right after the executable name which may contain spaces or
* brackets itself */
start_str = strrchr(stat, ')');
if (start_str == NULL) {
goto scan_err;
}

res = sscanf(start_str, ") %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %*u"
"%*u %*d %*d %*d %*d %*d %*d %lu", &stime);
if (res == 1) {
return stime;
}

scan_err:
ucs_error("failed to scan "UCS_PROCCESS_STAT_FMT, pid);
err:
return 0ul;
}


31 changes: 10 additions & 21 deletions src/ucs/sys/sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ typedef enum {
} ucs_sys_vma_info_flags_t;


/* file time information */
typedef enum {
UCS_SYS_FILE_TIME_CTIME, /**< create time */
UCS_SYS_FILE_TIME_ATIME, /**< access time */
UCS_SYS_FILE_TIME_MTIME /**< modification time */
} ucs_sys_file_time_t;


/* information about virtual memory area */
typedef struct {
unsigned long start;
Expand Down Expand Up @@ -596,19 +588,6 @@ ucs_status_t ucs_sys_readdir(const char *path, ucs_sys_readdir_cb_t cb, void *ct
ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx);


/**
* Get file time
*
* @param [in] name File name
* @param [in] type Type of file time information
* @param [out] ts File time information
*
* @return UCS_OK if file is found and got information.
*/
ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,
struct timespec *ts);


/**
* Check the per-process limit on the number of open file descriptors.
*
Expand Down Expand Up @@ -639,6 +618,16 @@ ucs_status_t ucs_pthread_create(pthread_t *thread_id_p,
*/
long ucs_sys_get_num_cpus();


/*
* Get process creation time.
*
* @param [in] pid Process id to get start time.
*
* @return The time the process started after system boot or 0 in case of error.
*/
unsigned long ucs_sys_get_proc_create_time(pid_t pid);

END_C_DECLS

#endif
83 changes: 19 additions & 64 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -787,52 +787,6 @@ ucs_status_t uct_base_ep_am_short_iov(uct_ep_h ep, uint8_t id, const uct_iov_t *
return status;
}

int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid)
{
ucs_assert((buffer != NULL) || (max_len == 0));
/* cppcheck-suppress nullPointer */
/* cppcheck-suppress ctunullpointer */
return snprintf(buffer, max_len, "/proc/%d", (int)pid);
}

ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p)
{
uct_keepalive_info_t *ka;
ucs_status_t status;
int proc_len;

proc_len = uct_ep_get_process_proc_dir(NULL, 0, pid);
if (proc_len <= 0) {
ucs_error("failed to get length to hold path to a process directory");
status = UCS_ERR_NO_MEMORY;
goto err;
}

ka = ucs_malloc(sizeof(*ka) + proc_len + 1, "keepalive");
if (ka == NULL) {
ucs_error("failed to allocate keepalive info");
status = UCS_ERR_NO_MEMORY;
goto err;
}

uct_ep_get_process_proc_dir(ka->proc, proc_len + 1, pid);

status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME,
&ka->start_time);
if (status != UCS_OK) {
ucs_error("failed to get process start time");
goto err_free_ka;
}

*ka_p = ka;
return UCS_OK;

err_free_ka:
ucs_free(ka);
err:
return status;
}

static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status)
{
uct_base_iface_t *iface = ucs_derived_of(ep->iface, uct_base_iface_t);
Expand All @@ -856,31 +810,32 @@ static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status)
return UCS_OK;
}

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p,
ucs_status_t uct_ep_keepalive_init(uct_keepalive_info_t *ka, pid_t pid)
{
ka->start_time = ucs_sys_get_proc_create_time(pid);
if (ka->start_time == 0) {
ucs_diag("failed to get start time for pid %d", pid);
return UCS_ERR_ENDPOINT_TIMEOUT;
}

return UCS_OK;
}

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t *ka,
pid_t pid, unsigned flags,
uct_completion_t *comp)
{
struct timespec create_time;
uct_keepalive_info_t *ka;
ucs_status_t status;
unsigned long start_time;

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);

if (*ka_p == NULL) {
status = uct_ep_keepalive_create(pid, ka_p);
} else {
ka = *ka_p;
status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME,
&create_time);
if ((status != UCS_OK) ||
(ka->start_time.tv_sec != create_time.tv_sec) ||
(ka->start_time.tv_nsec != create_time.tv_nsec)) {
status = UCS_ERR_ENDPOINT_TIMEOUT;
}
}
ucs_assert(ka->start_time != 0);

if (status != UCS_OK) {
return uct_iface_schedule_ep_err(ep, status);
start_time = ucs_sys_get_proc_create_time(pid);
if (ka->start_time != start_time) {
ucs_diag("ka failed for pid %d start time %lu != %lu", pid,
ka->start_time, start_time);
return uct_iface_schedule_ep_err(ep, UCS_ERR_ENDPOINT_TIMEOUT);
}

return UCS_OK;
Expand Down
7 changes: 3 additions & 4 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ typedef struct uct_failed_iface {
* Keepalive info used by EP
*/
typedef struct uct_keepalive_info {
struct timespec start_time; /* Process start time */
char proc[]; /* Process owner proc dir */
unsigned long start_time; /* Process start time */
} uct_keepalive_info_t;


Expand Down Expand Up @@ -849,9 +848,9 @@ ucs_status_t uct_base_ep_am_short_iov(uct_ep_h ep, uint8_t id, const uct_iov_t *

int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid);

ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p);
ucs_status_t uct_ep_keepalive_init(uct_keepalive_info_t *ka, pid_t pid);

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p,
ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t *ka,
pid_t pid, unsigned flags,
uct_completion_t *comp);

Expand Down
4 changes: 1 addition & 3 deletions src/uct/cuda/cuda_ipc/cuda_ipc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_ep_t, const uct_ep_params_t *params)
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);

self->remote_pid = *(const pid_t*)params->iface_addr;
self->keepalive = NULL;

return UCS_OK;
return uct_ep_keepalive_init(&self->keepalive, self->remote_pid);
}

static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_ep_t)
{
ucs_free(self->keepalive);
}

UCS_CLASS_DEFINE(uct_cuda_ipc_ep_t, uct_base_ep_t)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/cuda/cuda_ipc/cuda_ipc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
typedef struct uct_cuda_ipc_ep {
uct_base_ep_t super;
pid_t remote_pid;
uct_keepalive_info_t *keepalive; /* keepalive metadata */
uct_keepalive_info_t keepalive; /* keepalive metadata */
} uct_cuda_ipc_ep_t;


Expand Down
32 changes: 21 additions & 11 deletions src/uct/sm/mm/base/mm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ static void uct_mm_ep_signal_remote(uct_mm_ep_t *ep)
}
}

void uct_mm_ep_cleanup_remote_segs(uct_mm_ep_t *ep)
{
uct_mm_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_mm_iface_t);
uct_mm_remote_seg_t remote_seg;

kh_foreach_value(&ep->remote_segs, remote_seg, {
uct_mm_iface_mapper_call(iface, mem_detach, &remote_seg);
})

kh_destroy_inplace(uct_mm_remote_seg, &ep->remote_segs);
}

static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)
{
uct_mm_iface_t *iface = ucs_derived_of(params->iface, uct_mm_iface_t);
Expand Down Expand Up @@ -163,14 +176,20 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)
/* Initialize remote FIFO control structure */
uct_mm_iface_set_fifo_ptrs(fifo_ptr, &self->fifo_ctl, &self->fifo_elems);
self->cached_tail = self->fifo_ctl->tail;
self->keepalive = NULL;
ucs_arbiter_elem_init(&self->arb_elem);

status = uct_ep_keepalive_init(&self->keepalive, self->fifo_ctl->pid);
if (status != UCS_OK) {
goto err_free_segs;
}

ucs_debug("created mm ep %p, connected to remote FIFO id 0x%"PRIx64,
self, addr->fifo_seg_id);

return UCS_OK;

err_free_segs:
uct_mm_ep_cleanup_remote_segs(self);
err_free_md_addr:
ucs_free(self->remote_iface_addr);
err:
Expand All @@ -179,18 +198,9 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)

static UCS_CLASS_CLEANUP_FUNC(uct_mm_ep_t)
{
uct_mm_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_mm_iface_t);
uct_mm_remote_seg_t remote_seg;

ucs_free(self->keepalive);
uct_mm_ep_pending_purge(&self->super.super, NULL, NULL);

kh_foreach_value(&self->remote_segs, remote_seg, {
uct_mm_iface_mapper_call(iface, mem_detach, &remote_seg);
})

uct_mm_ep_cleanup_remote_segs(self);
ucs_free(self->remote_iface_addr);
kh_destroy_inplace(uct_mm_remote_seg, &self->remote_segs);
}

UCS_CLASS_DEFINE(uct_mm_ep_t, uct_base_ep_t)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/mm/base/mm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ typedef struct uct_mm_ep {
the interface as long as one of the endpoints is unable to send */
ucs_arbiter_elem_t arb_elem;

uct_keepalive_info_t *keepalive; /* keepalive info */
uct_keepalive_info_t keepalive; /* keepalive info */
} uct_mm_ep_t;


Expand Down
8 changes: 3 additions & 5 deletions src/uct/sm/scopy/cma/cma_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ static UCS_CLASS_INIT_FUNC(uct_cma_ep_t, const uct_ep_params_t *params)
UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
UCS_CLASS_CALL_SUPER_INIT(uct_scopy_ep_t, params);

self->remote_pid = *(const pid_t*)params->iface_addr &
~UCT_CMA_IFACE_ADDR_FLAG_PID_NS;
self->keepalive = NULL;
self->remote_pid = *(const pid_t*)params->iface_addr &
~UCT_CMA_IFACE_ADDR_FLAG_PID_NS;

return UCS_OK;
return uct_ep_keepalive_init(&self->keepalive, self->remote_pid);
}

static UCS_CLASS_CLEANUP_FUNC(uct_cma_ep_t)
{
ucs_free(self->keepalive);
}

UCS_CLASS_DEFINE(uct_cma_ep_t, uct_scopy_ep_t)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/scopy/cma/cma_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
typedef struct uct_cma_ep {
uct_scopy_ep_t super;
pid_t remote_pid;
uct_keepalive_info_t *keepalive;
uct_keepalive_info_t keepalive;
} uct_cma_ep_t;


Expand Down
Loading