Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/CUDA_COPY: use EVENT_FD instead of EVENT_ASYNC_CB - v1.14.x #8840

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ucs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ noinst_HEADERS = \
type/float8.h \
async/async.h \
async/pipe.h \
async/eventfd.h \
async/signal.h \
async/thread.h \
async/async_int.h
Expand All @@ -145,6 +146,7 @@ libucs_la_SOURCES = \
async/async.c \
async/signal.c \
async/pipe.c \
async/eventfd.c \
async/thread.c \
config/global_opts.c \
config/ucm_opts.c \
Expand Down
70 changes: 70 additions & 0 deletions src/ucs/async/eventfd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2023. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "eventfd.h"

#include <ucs/debug/log.h>
#include <ucs/sys/sys.h>


typedef ssize_t (*ucs_async_eventfd_cb_t)(int fd, void *buf, size_t count);


static inline ucs_status_t
ucs_async_eventfd_common_io(int fd, int blocking, ucs_async_eventfd_cb_t cb)
{
uint64_t dummy = 1;
int ret;

do {
ret = cb(fd, &dummy, sizeof(dummy));
if (ret > 0) {
return UCS_OK;
}

if ((ret < 0) && (errno != EAGAIN) && (errno != EINTR)) {
ucs_error("eventfd error (fd %d blocking %d): %m", fd, blocking);
return UCS_ERR_IO_ERROR;
}
} while (blocking);

return UCS_ERR_NO_PROGRESS;
}

ucs_status_t ucs_async_eventfd_create(int *fd_p)
{
int local_fd;

local_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (local_fd == -1) {
ucs_error("failed to create event fd: %m");
return UCS_ERR_IO_ERROR;
}

*fd_p = local_fd;
return UCS_OK;
}

void ucs_async_eventfd_destroy(int fd)
{
if (fd != UCS_ASYNC_EVENTFD_INVALID_FD) {
close(fd);
}
}

ucs_status_t ucs_async_eventfd_poll(int fd)
{
return ucs_async_eventfd_common_io(fd, 0, (ucs_async_eventfd_cb_t)read);
}

ucs_status_t ucs_async_eventfd_signal(int fd)
{
return ucs_async_eventfd_common_io(fd, 1, (ucs_async_eventfd_cb_t)write);
}
68 changes: 68 additions & 0 deletions src/ucs/async/eventfd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2023. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCS_ASYNC_EVENTFD_H
#define UCS_ASYNC_EVENTFD_H

#include <ucs/type/status.h>
#include <sys/eventfd.h>

BEGIN_C_DECLS


/**
* Represent either an unitialized or a closed event file descriptor.
*/
#define UCS_ASYNC_EVENTFD_INVALID_FD (-1)


/**
* @ingroup UCS_RESOURCE
*
* Create an event file descriptor. This file descriptor can later be passed as
* arguments to poll/signal functions to wait for notifications or to notify
* pollers.
*
* @param fd Pointer to integer which is populated with a file descriptor.
*/
ucs_status_t ucs_async_eventfd_create(int *fd);


/**
* @ingroup UCS_RESOURCE
*
* Destroy an event file descriptor.
*
* @param fd File descriptor to be closed.
*/
void ucs_async_eventfd_destroy(int fd);


/**
* @ingroup UCS_RESOURCE
*
* Notify a file descriptor when it is polled. An appropriate error is returned
* upon failure.
*
* @param fd File descriptor which will be notified.
*/
ucs_status_t ucs_async_eventfd_signal(int fd);


/**
* @ingroup UCS_RESOURCE
*
* Poll on a file descriptor for incoming notifications. If no notifications are
* observed then UCS_ERR_NO_PROGRESS is returned. An appropriate error is
* returned upon failure.
*
* @param fd File descriptor to be polled on.
*/
ucs_status_t ucs_async_eventfd_poll(int fd);

END_C_DECLS

#endif
50 changes: 50 additions & 0 deletions src/uct/cuda/base/cuda_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,53 @@ uct_cuda_base_query_devices(
return uct_cuda_base_query_devices_common(md, UCT_DEVICE_TYPE_ACC,
tl_devices_p, num_tl_devices_p);
}

#if (__CUDACC_VER_MAJOR__ >= 100000)
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(void *arg)
#else
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(CUstream hStream, CUresult status,
void *arg)
#endif
{
uct_cuda_iface_t *cuda_iface = arg;

ucs_async_eventfd_signal(cuda_iface->eventfd);
}

ucs_status_t uct_cuda_base_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p)
{
uct_cuda_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_iface_t);
ucs_status_t status;

if (iface->eventfd == UCS_ASYNC_EVENTFD_INVALID_FD) {
status = ucs_async_eventfd_create(&iface->eventfd);
if (status != UCS_OK) {
return status;
}
}

*fd_p = iface->eventfd;
return UCS_OK;
}

UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops,
uct_iface_internal_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_iface_config_t *tl_config,
const char *dev_name)
{
UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, tl_ops, ops, md, worker, params,
tl_config UCS_STATS_ARG(params->stats_root)
UCS_STATS_ARG(dev_name));

self->eventfd = UCS_ASYNC_EVENTFD_INVALID_FD;

return UCS_OK;
}

static UCS_CLASS_CLEANUP_FUNC(uct_cuda_iface_t)
{
ucs_async_eventfd_destroy(self->eventfd);
}

UCS_CLASS_DEFINE(uct_cuda_iface_t, uct_base_iface_t);
20 changes: 20 additions & 0 deletions src/uct/cuda/base/cuda_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <uct/base/uct_iface.h>
#include <ucs/sys/preprocessor.h>
#include <ucs/profile/profile.h>
#include <ucs/async/eventfd.h>
#include <cuda_runtime.h>
#include <cuda.h>
#include <nvml.h>
Expand Down Expand Up @@ -141,6 +142,11 @@ typedef enum uct_cuda_base_gen {
} uct_cuda_base_gen_t;


typedef struct uct_cuda_iface {
uct_base_iface_t super;
int eventfd;
} uct_cuda_iface_t;

ucs_status_t
uct_cuda_base_query_devices_common(
uct_md_h md, uct_device_type_t dev_type,
Expand All @@ -155,4 +161,18 @@ ucs_status_t
uct_cuda_base_get_sys_dev(CUdevice cuda_device,
ucs_sys_device_t *sys_dev_p);

ucs_status_t uct_cuda_base_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p);

#if (__CUDACC_VER_MAJOR__ >= 100000)
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(void *arg);
#else
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(CUstream hStream, CUresult status,
void *arg);
#endif

UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops,
uct_iface_internal_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_iface_config_t *tl_config, const char *dev_name);

#endif
2 changes: 1 addition & 1 deletion src/uct/cuda/cuda_copy/cuda_copy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_ep_t, const uct_ep_params_t *params)
uct_cuda_copy_iface_t);

UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);

return UCS_OK;
}
Expand Down
59 changes: 25 additions & 34 deletions src/uct/cuda/cuda_copy/cuda_copy_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <uct/cuda/base/cuda_md.h>
#include <ucs/type/class.h>
#include <ucs/sys/string.h>
#include <ucs/async/async.h>
#include <ucs/async/eventfd.h>
#include <ucs/arch/cpu.h>


Expand Down Expand Up @@ -73,7 +73,7 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface,
{
uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_copy_iface_t);

uct_base_iface_query(&iface->super, iface_attr);
uct_base_iface_query(&iface->super.super, iface_attr);

iface_attr->iface_addr_len = sizeof(uct_cuda_copy_iface_addr_t);
iface_attr->device_addr_len = 0;
Expand All @@ -87,7 +87,7 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface,

iface_attr->cap.event_flags = UCT_IFACE_FLAG_EVENT_SEND_COMP |
UCT_IFACE_FLAG_EVENT_RECV |
UCT_IFACE_FLAG_EVENT_ASYNC_CB;
UCT_IFACE_FLAG_EVENT_FD;

iface_attr->cap.put.max_short = UINT_MAX;
iface_attr->cap.put.max_bcopy = 0;
Expand Down Expand Up @@ -209,22 +209,6 @@ static unsigned uct_cuda_copy_iface_progress(uct_iface_h tl_iface)
return count;
}

#if (__CUDACC_VER_MAJOR__ >= 100000)
static void CUDA_CB myHostFn(void *cuda_copy_iface)
#else
static void CUDA_CB myHostCallback(CUstream hStream, CUresult status,
void *cuda_copy_iface)
#endif
{
uct_cuda_copy_iface_t *iface = cuda_copy_iface;

ucs_assert(iface->async.event_cb != NULL);
/* notify user */
UCS_ASYNC_BLOCK(iface->super.worker->async);
iface->async.event_cb(iface->async.event_arg, 0);
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
}

static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface,
unsigned events)
{
Expand All @@ -242,18 +226,30 @@ static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface,
}
}

status = ucs_async_eventfd_poll(iface->super.eventfd);
if (status == UCS_OK) {
return UCS_ERR_BUSY;
} else if (status == UCS_ERR_IO_ERROR) {
return status;
}

ucs_assertv(status == UCS_ERR_NO_PROGRESS, "%s", ucs_status_string(status));

ucs_queue_for_each_safe(q_desc, iter, &iface->active_queue, queue) {
event_q = &q_desc->event_queue;
stream = &q_desc->stream;
if (!ucs_queue_is_empty(event_q)) {
status =
#if (__CUDACC_VER_MAJOR__ >= 100000)
UCT_CUDADRV_FUNC_LOG_ERR(cuLaunchHostFunc(*stream,
myHostFn, iface));
UCT_CUDADRV_FUNC_LOG_ERR(
cuLaunchHostFunc(*stream,
uct_cuda_base_iface_stream_cb_fxn,
&iface->super));
#else
UCT_CUDADRV_FUNC_LOG_ERR(cuStreamAddCallback(*stream,
myHostCallback,
iface, 0));
UCT_CUDADRV_FUNC_LOG_ERR(
cuStreamAddCallback(*stream,
uct_cuda_base_iface_stream_cb_fxn,
&iface->super, 0));
#endif
if (UCS_OK != status) {
return status;
Expand All @@ -280,7 +276,7 @@ static uct_iface_ops_t uct_cuda_copy_iface_ops = {
.iface_progress_enable = uct_base_iface_progress_enable,
.iface_progress_disable = uct_base_iface_progress_disable,
.iface_progress = uct_cuda_copy_iface_progress,
.iface_event_fd_get = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_success,
.iface_event_fd_get = uct_cuda_base_iface_event_fd_get,
.iface_event_arm = uct_cuda_copy_iface_event_fd_arm,
.iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_cuda_copy_iface_t),
.iface_query = uct_cuda_copy_iface_query,
Expand Down Expand Up @@ -409,11 +405,9 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work
ucs_memory_type_t src, dst;
ucs_mpool_params_t mp_params;

UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, &uct_cuda_copy_iface_ops,
UCS_CLASS_CALL_SUPER_INIT(uct_cuda_iface_t, &uct_cuda_copy_iface_ops,
&uct_cuda_copy_iface_internal_ops, md, worker,
params,
tl_config UCS_STATS_ARG(params->stats_root)
UCS_STATS_ARG("cuda_copy"));
params, tl_config, "cuda_copy");

if (strncmp(params->mode.device.dev_name,
UCT_CUDA_DEV_NAME, strlen(UCT_CUDA_DEV_NAME)) != 0) {
Expand All @@ -438,9 +432,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work
return UCS_ERR_IO_ERROR;
}

uct_iface_set_async_event_params(params, &self->async.event_cb,
&self->async.event_arg);

ucs_queue_head_init(&self->active_queue);

for (src = 0; src < UCS_MEMORY_TYPE_LAST; ++src) {
Expand All @@ -463,7 +454,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t)
ucs_queue_head_t *event_q;
ucs_memory_type_t src, dst;

uct_base_iface_progress_disable(&self->super.super,
uct_base_iface_progress_disable(&self->super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&cuda_context));
Expand Down Expand Up @@ -494,7 +485,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t)
ucs_mpool_cleanup(&self->cuda_event_desc, 1);
}

UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_base_iface_t);
UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_cuda_iface_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_cuda_copy_iface_t, uct_iface_t, uct_md_h, uct_worker_h,
const uct_iface_params_t*, const uct_iface_config_t*);
static UCS_CLASS_DEFINE_DELETE_FUNC(uct_cuda_copy_iface_t, uct_iface_t);
Expand Down
Loading