diff --git a/src/ucs/Makefile.am b/src/ucs/Makefile.am index 644e2f7bdd8..9e0bd2ffb1c 100644 --- a/src/ucs/Makefile.am +++ b/src/ucs/Makefile.am @@ -127,6 +127,7 @@ noinst_HEADERS = \ type/float8.h \ async/async.h \ async/pipe.h \ + async/eventfd.h \ async/signal.h \ async/thread.h \ async/async_int.h @@ -145,6 +146,7 @@ libucs_la_SOURCES = \ async/async.c \ async/signal.c \ async/pipe.c \ + async/eventfd.c \ async/thread.c \ config/global_opts.c \ config/ucm_opts.c \ diff --git a/src/ucs/async/eventfd.c b/src/ucs/async/eventfd.c new file mode 100644 index 00000000000..7d3afc38240 --- /dev/null +++ b/src/ucs/async/eventfd.c @@ -0,0 +1,70 @@ +/** +* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2023. ALL RIGHTS RESERVED. +* +* See file LICENSE for terms. +*/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "eventfd.h" + +#include +#include + + +typedef ssize_t (*ucs_async_eventfd_cb_t)(int fd, void *buf, size_t count); + + +static inline ucs_status_t +ucs_async_eventfd_common_io(int fd, int blocking, ucs_async_eventfd_cb_t cb) +{ + uint64_t dummy = 1; + int ret; + + do { + ret = cb(fd, &dummy, sizeof(dummy)); + if (ret > 0) { + return UCS_OK; + } + + if ((ret < 0) && (errno != EAGAIN) && (errno != EINTR)) { + ucs_error("eventfd error (fd %d blocking %d): %m", fd, blocking); + return UCS_ERR_IO_ERROR; + } + } while (blocking); + + return UCS_ERR_NO_PROGRESS; +} + +ucs_status_t ucs_async_eventfd_create(int *fd_p) +{ + int local_fd; + + local_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (local_fd == -1) { + ucs_error("failed to create event fd: %m"); + return UCS_ERR_IO_ERROR; + } + + *fd_p = local_fd; + return UCS_OK; +} + +void ucs_async_eventfd_destroy(int fd) +{ + if (fd != UCS_ASYNC_EVENTFD_INVALID_FD) { + close(fd); + } +} + +ucs_status_t ucs_async_eventfd_poll(int fd) +{ + return ucs_async_eventfd_common_io(fd, 0, (ucs_async_eventfd_cb_t)read); +} + +ucs_status_t ucs_async_eventfd_signal(int fd) +{ + return ucs_async_eventfd_common_io(fd, 1, (ucs_async_eventfd_cb_t)write); +} diff --git a/src/ucs/async/eventfd.h b/src/ucs/async/eventfd.h new file mode 100644 index 00000000000..f32b54006f3 --- /dev/null +++ b/src/ucs/async/eventfd.h @@ -0,0 +1,68 @@ +/** +* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2023. ALL RIGHTS RESERVED. +* +* See file LICENSE for terms. +*/ + +#ifndef UCS_ASYNC_EVENTFD_H +#define UCS_ASYNC_EVENTFD_H + +#include +#include + +BEGIN_C_DECLS + + +/** + * Represent either an unitialized or a closed event file descriptor. + */ +#define UCS_ASYNC_EVENTFD_INVALID_FD (-1) + + +/** + * @ingroup UCS_RESOURCE + * + * Create an event file descriptor. This file descriptor can later be passed as + * arguments to poll/signal functions to wait for notifications or to notify + * pollers. + * + * @param fd Pointer to integer which is populated with a file descriptor. + */ +ucs_status_t ucs_async_eventfd_create(int *fd); + + +/** + * @ingroup UCS_RESOURCE + * + * Destroy an event file descriptor. + * + * @param fd File descriptor to be closed. + */ +void ucs_async_eventfd_destroy(int fd); + + +/** + * @ingroup UCS_RESOURCE + * + * Notify a file descriptor when it is polled. An appropriate error is returned + * upon failure. + * + * @param fd File descriptor which will be notified. + */ +ucs_status_t ucs_async_eventfd_signal(int fd); + + +/** + * @ingroup UCS_RESOURCE + * + * Poll on a file descriptor for incoming notifications. If no notifications are + * observed then UCS_ERR_NO_PROGRESS is returned. An appropriate error is + * returned upon failure. + * + * @param fd File descriptor to be polled on. + */ +ucs_status_t ucs_async_eventfd_poll(int fd); + +END_C_DECLS + +#endif diff --git a/src/uct/cuda/base/cuda_iface.c b/src/uct/cuda/base/cuda_iface.c index f5d44b99102..40c3fe03743 100644 --- a/src/uct/cuda/base/cuda_iface.c +++ b/src/uct/cuda/base/cuda_iface.c @@ -36,3 +36,53 @@ uct_cuda_base_query_devices( return uct_cuda_base_query_devices_common(md, UCT_DEVICE_TYPE_ACC, tl_devices_p, num_tl_devices_p); } + +#if (__CUDACC_VER_MAJOR__ >= 100000) +void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(void *arg) +#else +void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(CUstream hStream, CUresult status, + void *arg) +#endif +{ + uct_cuda_iface_t *cuda_iface = arg; + + ucs_async_eventfd_signal(cuda_iface->eventfd); +} + +ucs_status_t uct_cuda_base_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p) +{ + uct_cuda_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_iface_t); + ucs_status_t status; + + if (iface->eventfd == UCS_ASYNC_EVENTFD_INVALID_FD) { + status = ucs_async_eventfd_create(&iface->eventfd); + if (status != UCS_OK) { + return status; + } + } + + *fd_p = iface->eventfd; + return UCS_OK; +} + +UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops, + uct_iface_internal_ops_t *ops, uct_md_h md, + uct_worker_h worker, const uct_iface_params_t *params, + const uct_iface_config_t *tl_config, + const char *dev_name) +{ + UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, tl_ops, ops, md, worker, params, + tl_config UCS_STATS_ARG(params->stats_root) + UCS_STATS_ARG(dev_name)); + + self->eventfd = UCS_ASYNC_EVENTFD_INVALID_FD; + + return UCS_OK; +} + +static UCS_CLASS_CLEANUP_FUNC(uct_cuda_iface_t) +{ + ucs_async_eventfd_destroy(self->eventfd); +} + +UCS_CLASS_DEFINE(uct_cuda_iface_t, uct_base_iface_t); diff --git a/src/uct/cuda/base/cuda_iface.h b/src/uct/cuda/base/cuda_iface.h index d97dd472af6..85f7ac2ce7e 100644 --- a/src/uct/cuda/base/cuda_iface.h +++ b/src/uct/cuda/base/cuda_iface.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -141,6 +142,11 @@ typedef enum uct_cuda_base_gen { } uct_cuda_base_gen_t; +typedef struct uct_cuda_iface { + uct_base_iface_t super; + int eventfd; +} uct_cuda_iface_t; + ucs_status_t uct_cuda_base_query_devices_common( uct_md_h md, uct_device_type_t dev_type, @@ -155,4 +161,18 @@ ucs_status_t uct_cuda_base_get_sys_dev(CUdevice cuda_device, ucs_sys_device_t *sys_dev_p); +ucs_status_t uct_cuda_base_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p); + +#if (__CUDACC_VER_MAJOR__ >= 100000) +void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(void *arg); +#else +void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(CUstream hStream, CUresult status, + void *arg); +#endif + +UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops, + uct_iface_internal_ops_t *ops, uct_md_h md, + uct_worker_h worker, const uct_iface_params_t *params, + const uct_iface_config_t *tl_config, const char *dev_name); + #endif diff --git a/src/uct/cuda/cuda_copy/cuda_copy_ep.c b/src/uct/cuda/cuda_copy/cuda_copy_ep.c index 1c52768f4f3..310df83d3d7 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_ep.c +++ b/src/uct/cuda/cuda_copy/cuda_copy_ep.c @@ -26,7 +26,7 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_ep_t, const uct_ep_params_t *params) uct_cuda_copy_iface_t); UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params); - UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super); + UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super); return UCS_OK; } diff --git a/src/uct/cuda/cuda_copy/cuda_copy_iface.c b/src/uct/cuda/cuda_copy/cuda_copy_iface.c index 4c71a6f3ac9..ea4cf0a0d52 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_iface.c +++ b/src/uct/cuda/cuda_copy/cuda_copy_iface.c @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include @@ -73,7 +73,7 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface, { uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_copy_iface_t); - uct_base_iface_query(&iface->super, iface_attr); + uct_base_iface_query(&iface->super.super, iface_attr); iface_attr->iface_addr_len = sizeof(uct_cuda_copy_iface_addr_t); iface_attr->device_addr_len = 0; @@ -87,7 +87,7 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface, iface_attr->cap.event_flags = UCT_IFACE_FLAG_EVENT_SEND_COMP | UCT_IFACE_FLAG_EVENT_RECV | - UCT_IFACE_FLAG_EVENT_ASYNC_CB; + UCT_IFACE_FLAG_EVENT_FD; iface_attr->cap.put.max_short = UINT_MAX; iface_attr->cap.put.max_bcopy = 0; @@ -209,22 +209,6 @@ static unsigned uct_cuda_copy_iface_progress(uct_iface_h tl_iface) return count; } -#if (__CUDACC_VER_MAJOR__ >= 100000) -static void CUDA_CB myHostFn(void *cuda_copy_iface) -#else -static void CUDA_CB myHostCallback(CUstream hStream, CUresult status, - void *cuda_copy_iface) -#endif -{ - uct_cuda_copy_iface_t *iface = cuda_copy_iface; - - ucs_assert(iface->async.event_cb != NULL); - /* notify user */ - UCS_ASYNC_BLOCK(iface->super.worker->async); - iface->async.event_cb(iface->async.event_arg, 0); - UCS_ASYNC_UNBLOCK(iface->super.worker->async); -} - static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events) { @@ -242,18 +226,30 @@ static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface, } } + status = ucs_async_eventfd_poll(iface->super.eventfd); + if (status == UCS_OK) { + return UCS_ERR_BUSY; + } else if (status == UCS_ERR_IO_ERROR) { + return status; + } + + ucs_assertv(status == UCS_ERR_NO_PROGRESS, "%s", ucs_status_string(status)); + ucs_queue_for_each_safe(q_desc, iter, &iface->active_queue, queue) { event_q = &q_desc->event_queue; stream = &q_desc->stream; if (!ucs_queue_is_empty(event_q)) { status = #if (__CUDACC_VER_MAJOR__ >= 100000) - UCT_CUDADRV_FUNC_LOG_ERR(cuLaunchHostFunc(*stream, - myHostFn, iface)); + UCT_CUDADRV_FUNC_LOG_ERR( + cuLaunchHostFunc(*stream, + uct_cuda_base_iface_stream_cb_fxn, + &iface->super)); #else - UCT_CUDADRV_FUNC_LOG_ERR(cuStreamAddCallback(*stream, - myHostCallback, - iface, 0)); + UCT_CUDADRV_FUNC_LOG_ERR( + cuStreamAddCallback(*stream, + uct_cuda_base_iface_stream_cb_fxn, + &iface->super, 0)); #endif if (UCS_OK != status) { return status; @@ -280,7 +276,7 @@ static uct_iface_ops_t uct_cuda_copy_iface_ops = { .iface_progress_enable = uct_base_iface_progress_enable, .iface_progress_disable = uct_base_iface_progress_disable, .iface_progress = uct_cuda_copy_iface_progress, - .iface_event_fd_get = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_success, + .iface_event_fd_get = uct_cuda_base_iface_event_fd_get, .iface_event_arm = uct_cuda_copy_iface_event_fd_arm, .iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_cuda_copy_iface_t), .iface_query = uct_cuda_copy_iface_query, @@ -409,11 +405,9 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work ucs_memory_type_t src, dst; ucs_mpool_params_t mp_params; - UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, &uct_cuda_copy_iface_ops, + UCS_CLASS_CALL_SUPER_INIT(uct_cuda_iface_t, &uct_cuda_copy_iface_ops, &uct_cuda_copy_iface_internal_ops, md, worker, - params, - tl_config UCS_STATS_ARG(params->stats_root) - UCS_STATS_ARG("cuda_copy")); + params, tl_config, "cuda_copy"); if (strncmp(params->mode.device.dev_name, UCT_CUDA_DEV_NAME, strlen(UCT_CUDA_DEV_NAME)) != 0) { @@ -438,9 +432,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work return UCS_ERR_IO_ERROR; } - uct_iface_set_async_event_params(params, &self->async.event_cb, - &self->async.event_arg); - ucs_queue_head_init(&self->active_queue); for (src = 0; src < UCS_MEMORY_TYPE_LAST; ++src) { @@ -463,7 +454,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t) ucs_queue_head_t *event_q; ucs_memory_type_t src, dst; - uct_base_iface_progress_disable(&self->super.super, + uct_base_iface_progress_disable(&self->super.super.super, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&cuda_context)); @@ -494,7 +485,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t) ucs_mpool_cleanup(&self->cuda_event_desc, 1); } -UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_base_iface_t); +UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_cuda_iface_t); UCS_CLASS_DEFINE_NEW_FUNC(uct_cuda_copy_iface_t, uct_iface_t, uct_md_h, uct_worker_h, const uct_iface_params_t*, const uct_iface_config_t*); static UCS_CLASS_DEFINE_DELETE_FUNC(uct_cuda_copy_iface_t, uct_iface_t); diff --git a/src/uct/cuda/cuda_copy/cuda_copy_iface.h b/src/uct/cuda/cuda_copy/cuda_copy_iface.h index 13c8c24d16d..f0f91435a79 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_iface.h +++ b/src/uct/cuda/cuda_copy/cuda_copy_iface.h @@ -26,7 +26,7 @@ typedef struct uct_cuda_copy_queue_desc { typedef struct uct_cuda_copy_iface { - uct_base_iface_t super; + uct_cuda_iface_t super; /* used to store uuid and check iface reachability */ uct_cuda_copy_iface_addr_t id; /* pool of cuda events to check completion of memcpy operations */ @@ -35,6 +35,8 @@ typedef struct uct_cuda_copy_iface { ucs_queue_head_t active_queue; /* stream used to issue short operations */ cudaStream_t short_stream; + /* fd to get event notifications */ + int eventfd; /* stream used to issue short operations */ CUcontext cuda_context; /* array of queue descriptors for each src/dst memory type combination */ diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c b/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c index 27e043a4fee..238c2157fdf 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c @@ -28,7 +28,7 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_ep_t, const uct_ep_params_t *params) uct_cuda_ipc_iface_t); UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params); - UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super); + UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super); self->remote_pid = *(const pid_t*)params->iface_addr; diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c index e629154c832..3194109c5fe 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include @@ -191,7 +191,7 @@ static ucs_status_t uct_cuda_ipc_iface_query(uct_iface_h tl_iface, { uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_ipc_iface_t); - uct_base_iface_query(&iface->super, iface_attr); + uct_base_iface_query(&iface->super.super, iface_attr); iface_attr->iface_addr_len = sizeof(pid_t); iface_attr->device_addr_len = sizeof(uint64_t); @@ -250,56 +250,6 @@ uct_cuda_ipc_iface_flush(uct_iface_h tl_iface, unsigned flags, return UCS_INPROGRESS; } -static ucs_status_t uct_cuda_ipc_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p) -{ - uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_ipc_iface_t); - - if (-1 == iface->eventfd) { - iface->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - if (iface->eventfd == -1) { - ucs_error("Failed to create event fd: %m"); - return UCS_ERR_IO_ERROR; - } - } - - *fd_p = iface->eventfd; - return UCS_OK; -} - -static void uct_cuda_ipc_common_cb(void *cuda_ipc_iface) -{ - uct_cuda_ipc_iface_t *iface = cuda_ipc_iface; - uint64_t dummy = 1; - int ret; - - /* No error handling yet */ - do { - ret = write(iface->eventfd, &dummy, sizeof(dummy)); - if (ret == sizeof(dummy)) { - return; - } else if (ret == -1) { - if (errno == EAGAIN) { - continue; - } else if (errno != EINTR) { - ucs_error("Signaling wakeup failed: %m"); - return; - } - } else { - ucs_assert(ret == 0); - } - } while (ret == 0); -} - -#if (__CUDACC_VER_MAJOR__ >= 100000) -static void CUDA_CB myHostFn(void *iface) -#else -static void CUDA_CB myHostCallback(CUstream hStream, CUresult status, - void *iface) -#endif -{ - uct_cuda_ipc_common_cb(iface); -} - static UCS_F_ALWAYS_INLINE unsigned uct_cuda_ipc_progress_event_q(uct_cuda_ipc_iface_t *iface, ucs_queue_head_t *event_q) @@ -355,48 +305,34 @@ static ucs_status_t uct_cuda_ipc_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events) { uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_ipc_iface_t); - int ret; int i; - uint64_t dummy; ucs_status_t status; if (uct_cuda_ipc_progress_event_q(iface, &iface->outstanding_d2d_event_q)) { return UCS_ERR_BUSY; } - ucs_assert(iface->eventfd != -1); - - do { - ret = read(iface->eventfd, &dummy, sizeof(dummy)); - if (ret == sizeof(dummy)) { - status = UCS_ERR_BUSY; - return status; - } else if (ret == -1) { - if (errno == EAGAIN) { - break; - } else if (errno != EINTR) { - ucs_error("read from internal event fd failed: %m"); - status = UCS_ERR_IO_ERROR; - return status; - } else { - return UCS_ERR_BUSY; - } - } else { - ucs_assert(ret == 0); - } - } while (ret != 0); + status = ucs_async_eventfd_poll(iface->super.eventfd); + if (status == UCS_OK) { + return UCS_ERR_BUSY; + } else if (status == UCS_ERR_IO_ERROR) { + return status; + } if (iface->streams_initialized) { for (i = 0; i < iface->config.max_streams; i++) { if (iface->stream_refcount[i]) { status = #if (__CUDACC_VER_MAJOR__ >= 100000) - UCT_CUDADRV_FUNC_LOG_ERR(cuLaunchHostFunc(iface->stream_d2d[i], - myHostFn, iface)); + UCT_CUDADRV_FUNC_LOG_ERR( + cuLaunchHostFunc(iface->stream_d2d[i], + uct_cuda_base_iface_stream_cb_fxn, + &iface->super)); #else - UCT_CUDADRV_FUNC_LOG_ERR(cuStreamAddCallback(iface->stream_d2d[i], - myHostCallback, - iface, 0)); + UCT_CUDADRV_FUNC_LOG_ERR( + cuStreamAddCallback(iface->stream_d2d[i], + uct_cuda_base_iface_stream_cb_fxn, + &iface->super, 0)); #endif if (UCS_OK != status) { return status; @@ -422,7 +358,7 @@ static uct_iface_ops_t uct_cuda_ipc_iface_ops = { .iface_progress_enable = uct_base_iface_progress_enable, .iface_progress_disable = uct_base_iface_progress_disable, .iface_progress = uct_cuda_ipc_iface_progress, - .iface_event_fd_get = uct_cuda_ipc_iface_event_fd_get, + .iface_event_fd_get = uct_cuda_base_iface_event_fd_get, .iface_event_arm = uct_cuda_ipc_iface_event_fd_arm, .iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_cuda_ipc_iface_t), .iface_query = uct_cuda_ipc_iface_query, @@ -490,10 +426,9 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke ucs_mpool_params_t mp_params; config = ucs_derived_of(tl_config, uct_cuda_ipc_iface_config_t); - UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, &uct_cuda_ipc_iface_ops, + UCS_CLASS_CALL_SUPER_INIT(uct_cuda_iface_t, &uct_cuda_ipc_iface_ops, &uct_base_iface_internal_ops, md, worker, params, - tl_config UCS_STATS_ARG(params->stats_root) - UCS_STATS_ARG("cuda_ipc")); + tl_config, "cuda_ipc"); if (strncmp(params->mode.device.dev_name, UCT_CUDA_DEV_NAME, strlen(UCT_CUDA_DEV_NAME)) != 0) { @@ -521,7 +456,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke return UCS_ERR_IO_ERROR; } - self->eventfd = -1; self->streams_initialized = 0; self->cuda_context = 0; ucs_queue_head_init(&self->outstanding_d2d_event_q); @@ -550,12 +484,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_iface_t) self->streams_initialized = 0; } - uct_base_iface_progress_disable(&self->super.super, + uct_base_iface_progress_disable(&self->super.super.super, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); ucs_mpool_cleanup(&self->event_desc, 1); - if (self->eventfd != -1) { - close(self->eventfd); - } } ucs_status_t @@ -567,7 +498,7 @@ uct_cuda_ipc_query_devices( tl_devices_p, num_tl_devices_p); } -UCS_CLASS_DEFINE(uct_cuda_ipc_iface_t, uct_base_iface_t); +UCS_CLASS_DEFINE(uct_cuda_ipc_iface_t, uct_cuda_iface_t); UCS_CLASS_DEFINE_NEW_FUNC(uct_cuda_ipc_iface_t, uct_iface_t, uct_md_h, uct_worker_h, const uct_iface_params_t*, const uct_iface_config_t*); static UCS_CLASS_DEFINE_DELETE_FUNC(uct_cuda_ipc_iface_t, uct_iface_t); diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h index f266a861fd6..5e14a916073 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h @@ -7,6 +7,7 @@ #define UCT_CUDA_IPC_IFACE_H #include +#include #include #include #include @@ -18,8 +19,9 @@ #define UCT_CUDA_IPC_MAX_PEERS 16 + typedef struct uct_cuda_ipc_iface { - uct_base_iface_t super; + uct_cuda_iface_t super; ucs_mpool_t event_desc; /* cuda event desc */ ucs_queue_head_t outstanding_d2d_event_q; /* stream for outstanding d2d */ int eventfd; /* get event notifications */