Skip to content

Commit

Permalink
UCT/SM/CUDA: Fix common intra-node keepalive protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
brminich committed Jan 3, 2022
1 parent af8bde5 commit 7135263
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 159 deletions.
61 changes: 35 additions & 26 deletions src/ucs/sys/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#define UCS_PROCESS_NS_DIR "/proc/self/ns"
#define UCS_PROCESS_BOOTID_FILE "/proc/sys/kernel/random/boot_id"
#define UCS_PROCESS_BOOTID_FMT "%x-%4hx-%4hx-%4hx-%2hhx%2hhx%2hhx%2hhx%2hhx%2hhx"
#define UCS_PROCCESS_STAT_FMT "/proc/%d/stat"
#define UCS_PROCESS_NS_FIRST 0xF0000000U
#define UCS_PROCESS_NS_NET_DFLT 0xF0000080U

Expand Down Expand Up @@ -1462,32 +1463,6 @@ ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx)
return ucs_sys_readdir(task_dir, &ucs_sys_enum_threads_cb, &param);
}

ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,
struct timespec *ts)
{
struct stat stat_buf;
int res;

res = stat(name, &stat_buf);
if (res != 0) {
return UCS_ERR_IO_ERROR; /* failed to get file info */
}

switch (type) {
case UCS_SYS_FILE_TIME_CTIME:
*ts = stat_buf.st_ctim;
return UCS_OK;
case UCS_SYS_FILE_TIME_ATIME:
*ts = stat_buf.st_atim;
return UCS_OK;
case UCS_SYS_FILE_TIME_MTIME:
*ts = stat_buf.st_mtim;
return UCS_OK;
default:
return UCS_ERR_INVALID_PARAM;
}
}

ucs_status_t ucs_sys_check_fd_limit_per_process()
{
int fd;
Expand Down Expand Up @@ -1541,3 +1516,37 @@ long ucs_sys_get_num_cpus()

return num_cpus;
}

unsigned long ucs_sys_get_proc_create_time(pid_t pid)
{
char stat[1024];
char *start_str;
ssize_t size;
unsigned long stime;
int res;

size = ucs_read_file_str(stat, sizeof(stat), 1, UCS_PROCCESS_STAT_FMT, pid);
if (size < 0) {
goto err;
}

/* Start sscanf right after the executable name which may contain spaces or
* brackets itself */
start_str = strrchr(stat, ')');
if (start_str == NULL) {
goto scan_err;
}

res = sscanf(start_str, ") %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %*u"
"%*u %*d %*d %*d %*d %*d %*d %lu", &stime);
if (res == 1) {
return stime;
}

scan_err:
ucs_error("failed to scan "UCS_PROCCESS_STAT_FMT, pid);
err:
return 0ul;
}


31 changes: 10 additions & 21 deletions src/ucs/sys/sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ typedef enum {
} ucs_sys_vma_info_flags_t;


/* file time information */
typedef enum {
UCS_SYS_FILE_TIME_CTIME, /**< create time */
UCS_SYS_FILE_TIME_ATIME, /**< access time */
UCS_SYS_FILE_TIME_MTIME /**< modification time */
} ucs_sys_file_time_t;


/* information about virtual memory area */
typedef struct {
unsigned long start;
Expand Down Expand Up @@ -596,19 +588,6 @@ ucs_status_t ucs_sys_readdir(const char *path, ucs_sys_readdir_cb_t cb, void *ct
ucs_status_t ucs_sys_enum_threads(ucs_sys_enum_threads_cb_t cb, void *ctx);


/**
* Get file time
*
* @param [in] name File name
* @param [in] type Type of file time information
* @param [out] ts File time information
*
* @return UCS_OK if file is found and got information.
*/
ucs_status_t ucs_sys_get_file_time(const char *name, ucs_sys_file_time_t type,
struct timespec *ts);


/**
* Check the per-process limit on the number of open file descriptors.
*
Expand Down Expand Up @@ -639,6 +618,16 @@ ucs_status_t ucs_pthread_create(pthread_t *thread_id_p,
*/
long ucs_sys_get_num_cpus();


/*
* Get process creation time.
*
* @param [in] pid Process id to get start time.
*
* @return The time the process started after system boot or 0 in case of error.
*/
unsigned long ucs_sys_get_proc_create_time(pid_t pid);

END_C_DECLS

#endif
83 changes: 19 additions & 64 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -787,52 +787,6 @@ ucs_status_t uct_base_ep_am_short_iov(uct_ep_h ep, uint8_t id, const uct_iov_t *
return status;
}

int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid)
{
ucs_assert((buffer != NULL) || (max_len == 0));
/* cppcheck-suppress nullPointer */
/* cppcheck-suppress ctunullpointer */
return snprintf(buffer, max_len, "/proc/%d", (int)pid);
}

ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p)
{
uct_keepalive_info_t *ka;
ucs_status_t status;
int proc_len;

proc_len = uct_ep_get_process_proc_dir(NULL, 0, pid);
if (proc_len <= 0) {
ucs_error("failed to get length to hold path to a process directory");
status = UCS_ERR_NO_MEMORY;
goto err;
}

ka = ucs_malloc(sizeof(*ka) + proc_len + 1, "keepalive");
if (ka == NULL) {
ucs_error("failed to allocate keepalive info");
status = UCS_ERR_NO_MEMORY;
goto err;
}

uct_ep_get_process_proc_dir(ka->proc, proc_len + 1, pid);

status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME,
&ka->start_time);
if (status != UCS_OK) {
ucs_error("failed to get process start time");
goto err_free_ka;
}

*ka_p = ka;
return UCS_OK;

err_free_ka:
ucs_free(ka);
err:
return status;
}

static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status)
{
uct_base_iface_t *iface = ucs_derived_of(ep->iface, uct_base_iface_t);
Expand All @@ -856,31 +810,32 @@ static ucs_status_t uct_iface_schedule_ep_err(uct_ep_h ep, ucs_status_t status)
return UCS_OK;
}

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p,
ucs_status_t uct_ep_keepalive_init(uct_keepalive_info_t *ka, pid_t pid)
{
ka->start_time = ucs_sys_get_proc_create_time(pid);
if (ka->start_time == 0) {
ucs_diag("failed to get start time for pid %d", pid);
return UCS_ERR_ENDPOINT_TIMEOUT;
}

return UCS_OK;
}

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t *ka,
pid_t pid, unsigned flags,
uct_completion_t *comp)
{
struct timespec create_time;
uct_keepalive_info_t *ka;
ucs_status_t status;
unsigned long start_time;

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);

if (*ka_p == NULL) {
status = uct_ep_keepalive_create(pid, ka_p);
} else {
ka = *ka_p;
status = ucs_sys_get_file_time(ka->proc, UCS_SYS_FILE_TIME_CTIME,
&create_time);
if ((status != UCS_OK) ||
(ka->start_time.tv_sec != create_time.tv_sec) ||
(ka->start_time.tv_nsec != create_time.tv_nsec)) {
status = UCS_ERR_ENDPOINT_TIMEOUT;
}
}
ucs_assert(ka->start_time != 0);

if (status != UCS_OK) {
return uct_iface_schedule_ep_err(ep, status);
start_time = ucs_sys_get_proc_create_time(pid);
if (ka->start_time != start_time) {
ucs_diag("ka failed for pid %d start time %lu != %lu", pid,
ka->start_time, start_time);
return uct_iface_schedule_ep_err(ep, UCS_ERR_ENDPOINT_TIMEOUT);
}

return UCS_OK;
Expand Down
7 changes: 3 additions & 4 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ typedef struct uct_failed_iface {
* Keepalive info used by EP
*/
typedef struct uct_keepalive_info {
struct timespec start_time; /* Process start time */
char proc[]; /* Process owner proc dir */
unsigned long start_time; /* Process start time */
} uct_keepalive_info_t;


Expand Down Expand Up @@ -849,9 +848,9 @@ ucs_status_t uct_base_ep_am_short_iov(uct_ep_h ep, uint8_t id, const uct_iov_t *

int uct_ep_get_process_proc_dir(char *buffer, size_t max_len, pid_t pid);

ucs_status_t uct_ep_keepalive_create(pid_t pid, uct_keepalive_info_t **ka_p);
ucs_status_t uct_ep_keepalive_init(uct_keepalive_info_t *ka, pid_t pid);

ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t **ka_p,
ucs_status_t uct_ep_keepalive_check(uct_ep_h ep, uct_keepalive_info_t *ka,
pid_t pid, unsigned flags,
uct_completion_t *comp);

Expand Down
4 changes: 1 addition & 3 deletions src/uct/cuda/cuda_ipc/cuda_ipc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_ep_t, const uct_ep_params_t *params)
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);

self->remote_pid = *(const pid_t*)params->iface_addr;
self->keepalive = NULL;

return UCS_OK;
return uct_ep_keepalive_init(&self->keepalive, self->remote_pid);
}

static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_ep_t)
{
ucs_free(self->keepalive);
}

UCS_CLASS_DEFINE(uct_cuda_ipc_ep_t, uct_base_ep_t)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/cuda/cuda_ipc/cuda_ipc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
typedef struct uct_cuda_ipc_ep {
uct_base_ep_t super;
pid_t remote_pid;
uct_keepalive_info_t *keepalive; /* keepalive metadata */
uct_keepalive_info_t keepalive; /* keepalive metadata */
} uct_cuda_ipc_ep_t;


Expand Down
32 changes: 21 additions & 11 deletions src/uct/sm/mm/base/mm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ static void uct_mm_ep_signal_remote(uct_mm_ep_t *ep)
}
}

void uct_mm_ep_cleanup_remote_segs(uct_mm_ep_t *ep)
{
uct_mm_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_mm_iface_t);
uct_mm_remote_seg_t remote_seg;

kh_foreach_value(&ep->remote_segs, remote_seg, {
uct_mm_iface_mapper_call(iface, mem_detach, &remote_seg);
})

kh_destroy_inplace(uct_mm_remote_seg, &ep->remote_segs);
}

static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)
{
uct_mm_iface_t *iface = ucs_derived_of(params->iface, uct_mm_iface_t);
Expand Down Expand Up @@ -163,14 +176,20 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)
/* Initialize remote FIFO control structure */
uct_mm_iface_set_fifo_ptrs(fifo_ptr, &self->fifo_ctl, &self->fifo_elems);
self->cached_tail = self->fifo_ctl->tail;
self->keepalive = NULL;
ucs_arbiter_elem_init(&self->arb_elem);

status = uct_ep_keepalive_init(&self->keepalive, self->fifo_ctl->pid);
if (status != UCS_OK) {
goto err_free_segs;
}

ucs_debug("created mm ep %p, connected to remote FIFO id 0x%"PRIx64,
self, addr->fifo_seg_id);

return UCS_OK;

err_free_segs:
uct_mm_ep_cleanup_remote_segs(self);
err_free_md_addr:
ucs_free(self->remote_iface_addr);
err:
Expand All @@ -179,18 +198,9 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)

static UCS_CLASS_CLEANUP_FUNC(uct_mm_ep_t)
{
uct_mm_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_mm_iface_t);
uct_mm_remote_seg_t remote_seg;

ucs_free(self->keepalive);
uct_mm_ep_pending_purge(&self->super.super, NULL, NULL);

kh_foreach_value(&self->remote_segs, remote_seg, {
uct_mm_iface_mapper_call(iface, mem_detach, &remote_seg);
})

uct_mm_ep_cleanup_remote_segs(self);
ucs_free(self->remote_iface_addr);
kh_destroy_inplace(uct_mm_remote_seg, &self->remote_segs);
}

UCS_CLASS_DEFINE(uct_mm_ep_t, uct_base_ep_t)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/mm/base/mm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ typedef struct uct_mm_ep {
the interface as long as one of the endpoints is unable to send */
ucs_arbiter_elem_t arb_elem;

uct_keepalive_info_t *keepalive; /* keepalive info */
uct_keepalive_info_t keepalive; /* keepalive info */
} uct_mm_ep_t;


Expand Down
8 changes: 3 additions & 5 deletions src/uct/sm/scopy/cma/cma_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ static UCS_CLASS_INIT_FUNC(uct_cma_ep_t, const uct_ep_params_t *params)
UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
UCS_CLASS_CALL_SUPER_INIT(uct_scopy_ep_t, params);

self->remote_pid = *(const pid_t*)params->iface_addr &
~UCT_CMA_IFACE_ADDR_FLAG_PID_NS;
self->keepalive = NULL;
self->remote_pid = *(const pid_t*)params->iface_addr &
~UCT_CMA_IFACE_ADDR_FLAG_PID_NS;

return UCS_OK;
return uct_ep_keepalive_init(&self->keepalive, self->remote_pid);
}

static UCS_CLASS_CLEANUP_FUNC(uct_cma_ep_t)
{
ucs_free(self->keepalive);
}

UCS_CLASS_DEFINE(uct_cma_ep_t, uct_scopy_ep_t)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/scopy/cma/cma_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
typedef struct uct_cma_ep {
uct_scopy_ep_t super;
pid_t remote_pid;
uct_keepalive_info_t *keepalive;
uct_keepalive_info_t keepalive;
} uct_cma_ep_t;


Expand Down
Loading

0 comments on commit 7135263

Please sign in to comment.