Skip to content

Commit

Permalink
UCT/CUDA_COPY: use EVENT_FD instead of EVENT_ASYNC_CB
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay-Venkatesh committed Jan 31, 2023
1 parent d83ef40 commit 348c9dc
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 128 deletions.
2 changes: 2 additions & 0 deletions src/ucs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ noinst_HEADERS = \
type/float8.h \
async/async.h \
async/pipe.h \
async/eventfd.h \
async/signal.h \
async/thread.h \
async/async_int.h
Expand All @@ -145,6 +146,7 @@ libucs_la_SOURCES = \
async/async.c \
async/signal.c \
async/pipe.c \
async/eventfd.c \
async/thread.c \
config/global_opts.c \
config/ucm_opts.c \
Expand Down
70 changes: 70 additions & 0 deletions src/ucs/async/eventfd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2023. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "eventfd.h"

#include <ucs/debug/log.h>
#include <ucs/sys/sys.h>


typedef ssize_t (*ucs_async_eventfd_cb_t)(int fd, void *buf, size_t count);


static inline ucs_status_t
ucs_async_eventfd_common_io(int fd, int blocking, ucs_async_eventfd_cb_t cb)
{
uint64_t dummy = 1;
int ret;

do {
ret = cb(fd, &dummy, sizeof(dummy));
if (ret > 0) {
return UCS_OK;
}

if ((ret < 0) && (errno != EAGAIN) && (errno != EINTR)) {
ucs_error("eventfd error (fd %d blocking %d): %m", fd, blocking);
return UCS_ERR_IO_ERROR;
}
} while (blocking);

return UCS_ERR_NO_PROGRESS;
}

ucs_status_t ucs_async_eventfd_create(int *fd_p)
{
int local_fd;

local_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (local_fd == -1) {
ucs_error("failed to create event fd: %m");
return UCS_ERR_IO_ERROR;
}

*fd_p = local_fd;
return UCS_OK;
}

void ucs_async_eventfd_destroy(int fd)
{
if (fd != UCS_ASYNC_EVENTFD_INVALID_FD) {
close(fd);
}
}

ucs_status_t ucs_async_eventfd_poll(int fd)
{
return ucs_async_eventfd_common_io(fd, 0, (ucs_async_eventfd_cb_t)read);
}

ucs_status_t ucs_async_eventfd_signal(int fd)
{
return ucs_async_eventfd_common_io(fd, 1, (ucs_async_eventfd_cb_t)write);
}
68 changes: 68 additions & 0 deletions src/ucs/async/eventfd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2023. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCS_ASYNC_EVENTFD_H
#define UCS_ASYNC_EVENTFD_H

#include <ucs/type/status.h>
#include <sys/eventfd.h>

BEGIN_C_DECLS


/**
* Represent either an unitialized or a closed event file descriptor.
*/
#define UCS_ASYNC_EVENTFD_INVALID_FD (-1)


/**
* @ingroup UCS_RESOURCE
*
* Create an event file descriptor. This file descriptor can later be passed as
* arguments to poll/signal functions to wait for notifications or to notify
* pollers.
*
* @param fd Pointer to integer which is populated with a file descriptor.
*/
ucs_status_t ucs_async_eventfd_create(int *fd);


/**
* @ingroup UCS_RESOURCE
*
* Destroy an event file descriptor.
*
* @param fd File descriptor to be closed.
*/
void ucs_async_eventfd_destroy(int fd);


/**
* @ingroup UCS_RESOURCE
*
* Notify a file descriptor when it is polled. An appropriate error is returned
* upon failure.
*
* @param fd File descriptor which will be notified.
*/
ucs_status_t ucs_async_eventfd_signal(int fd);


/**
* @ingroup UCS_RESOURCE
*
* Poll on a file descriptor for incoming notifications. If no notifications are
* observed then UCS_ERR_NO_PROGRESS is returned. An appropriate error is
* returned upon failure.
*
* @param fd File descriptor to be polled on.
*/
ucs_status_t ucs_async_eventfd_poll(int fd);

END_C_DECLS

#endif
50 changes: 50 additions & 0 deletions src/uct/cuda/base/cuda_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,53 @@ uct_cuda_base_query_devices(
return uct_cuda_base_query_devices_common(md, UCT_DEVICE_TYPE_ACC,
tl_devices_p, num_tl_devices_p);
}

#if (__CUDACC_VER_MAJOR__ >= 100000)
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(void *arg)
#else
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(CUstream hStream, CUresult status,
void *arg)
#endif
{
uct_cuda_iface_t *cuda_iface = arg;

ucs_async_eventfd_signal(cuda_iface->eventfd);
}

ucs_status_t uct_cuda_base_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p)
{
uct_cuda_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_iface_t);
ucs_status_t status;

if (iface->eventfd == UCS_ASYNC_EVENTFD_INVALID_FD) {
status = ucs_async_eventfd_create(&iface->eventfd);
if (status != UCS_OK) {
return status;
}
}

*fd_p = iface->eventfd;
return UCS_OK;
}

UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops,
uct_iface_internal_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_iface_config_t *tl_config,
const char *dev_name)
{
UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, tl_ops, ops, md, worker, params,
tl_config UCS_STATS_ARG(params->stats_root)
UCS_STATS_ARG(dev_name));

self->eventfd = UCS_ASYNC_EVENTFD_INVALID_FD;

return UCS_OK;
}

static UCS_CLASS_CLEANUP_FUNC(uct_cuda_iface_t)
{
ucs_async_eventfd_destroy(self->eventfd);
}

UCS_CLASS_DEFINE(uct_cuda_iface_t, uct_base_iface_t);
20 changes: 20 additions & 0 deletions src/uct/cuda/base/cuda_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <uct/base/uct_iface.h>
#include <ucs/sys/preprocessor.h>
#include <ucs/profile/profile.h>
#include <ucs/async/eventfd.h>
#include <cuda_runtime.h>
#include <cuda.h>
#include <nvml.h>
Expand Down Expand Up @@ -141,6 +142,11 @@ typedef enum uct_cuda_base_gen {
} uct_cuda_base_gen_t;


typedef struct uct_cuda_iface {
uct_base_iface_t super;
int eventfd;
} uct_cuda_iface_t;

ucs_status_t
uct_cuda_base_query_devices_common(
uct_md_h md, uct_device_type_t dev_type,
Expand All @@ -155,4 +161,18 @@ ucs_status_t
uct_cuda_base_get_sys_dev(CUdevice cuda_device,
ucs_sys_device_t *sys_dev_p);

ucs_status_t uct_cuda_base_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p);

#if (__CUDACC_VER_MAJOR__ >= 100000)
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(void *arg);
#else
void CUDA_CB uct_cuda_base_iface_stream_cb_fxn(CUstream hStream, CUresult status,
void *arg);
#endif

UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops,
uct_iface_internal_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_iface_config_t *tl_config, const char *dev_name);

#endif
2 changes: 1 addition & 1 deletion src/uct/cuda/cuda_copy/cuda_copy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_ep_t, const uct_ep_params_t *params)
uct_cuda_copy_iface_t);

UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);

return UCS_OK;
}
Expand Down
59 changes: 25 additions & 34 deletions src/uct/cuda/cuda_copy/cuda_copy_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <uct/cuda/base/cuda_md.h>
#include <ucs/type/class.h>
#include <ucs/sys/string.h>
#include <ucs/async/async.h>
#include <ucs/async/eventfd.h>
#include <ucs/arch/cpu.h>


Expand Down Expand Up @@ -73,7 +73,7 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface,
{
uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_copy_iface_t);

uct_base_iface_query(&iface->super, iface_attr);
uct_base_iface_query(&iface->super.super, iface_attr);

iface_attr->iface_addr_len = sizeof(uct_cuda_copy_iface_addr_t);
iface_attr->device_addr_len = 0;
Expand All @@ -87,7 +87,7 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface,

iface_attr->cap.event_flags = UCT_IFACE_FLAG_EVENT_SEND_COMP |
UCT_IFACE_FLAG_EVENT_RECV |
UCT_IFACE_FLAG_EVENT_ASYNC_CB;
UCT_IFACE_FLAG_EVENT_FD;

iface_attr->cap.put.max_short = UINT_MAX;
iface_attr->cap.put.max_bcopy = 0;
Expand Down Expand Up @@ -209,22 +209,6 @@ static unsigned uct_cuda_copy_iface_progress(uct_iface_h tl_iface)
return count;
}

#if (__CUDACC_VER_MAJOR__ >= 100000)
static void CUDA_CB myHostFn(void *cuda_copy_iface)
#else
static void CUDA_CB myHostCallback(CUstream hStream, CUresult status,
void *cuda_copy_iface)
#endif
{
uct_cuda_copy_iface_t *iface = cuda_copy_iface;

ucs_assert(iface->async.event_cb != NULL);
/* notify user */
UCS_ASYNC_BLOCK(iface->super.worker->async);
iface->async.event_cb(iface->async.event_arg, 0);
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
}

static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface,
unsigned events)
{
Expand All @@ -242,18 +226,30 @@ static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface,
}
}

status = ucs_async_eventfd_poll(iface->super.eventfd);
if (status == UCS_OK) {
return UCS_ERR_BUSY;
} else if (status == UCS_ERR_IO_ERROR) {
return status;
}

ucs_assertv(status == UCS_ERR_NO_PROGRESS, "%s", ucs_status_string(status));

ucs_queue_for_each_safe(q_desc, iter, &iface->active_queue, queue) {
event_q = &q_desc->event_queue;
stream = &q_desc->stream;
if (!ucs_queue_is_empty(event_q)) {
status =
#if (__CUDACC_VER_MAJOR__ >= 100000)
UCT_CUDADRV_FUNC_LOG_ERR(cuLaunchHostFunc(*stream,
myHostFn, iface));
UCT_CUDADRV_FUNC_LOG_ERR(
cuLaunchHostFunc(*stream,
uct_cuda_base_iface_stream_cb_fxn,
&iface->super));
#else
UCT_CUDADRV_FUNC_LOG_ERR(cuStreamAddCallback(*stream,
myHostCallback,
iface, 0));
UCT_CUDADRV_FUNC_LOG_ERR(
cuStreamAddCallback(*stream,
uct_cuda_base_iface_stream_cb_fxn,
&iface->super, 0));
#endif
if (UCS_OK != status) {
return status;
Expand All @@ -280,7 +276,7 @@ static uct_iface_ops_t uct_cuda_copy_iface_ops = {
.iface_progress_enable = uct_base_iface_progress_enable,
.iface_progress_disable = uct_base_iface_progress_disable,
.iface_progress = uct_cuda_copy_iface_progress,
.iface_event_fd_get = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_success,
.iface_event_fd_get = uct_cuda_base_iface_event_fd_get,
.iface_event_arm = uct_cuda_copy_iface_event_fd_arm,
.iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_cuda_copy_iface_t),
.iface_query = uct_cuda_copy_iface_query,
Expand Down Expand Up @@ -409,11 +405,9 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work
ucs_memory_type_t src, dst;
ucs_mpool_params_t mp_params;

UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, &uct_cuda_copy_iface_ops,
UCS_CLASS_CALL_SUPER_INIT(uct_cuda_iface_t, &uct_cuda_copy_iface_ops,
&uct_cuda_copy_iface_internal_ops, md, worker,
params,
tl_config UCS_STATS_ARG(params->stats_root)
UCS_STATS_ARG("cuda_copy"));
params, tl_config, "cuda_copy");

if (strncmp(params->mode.device.dev_name,
UCT_CUDA_DEV_NAME, strlen(UCT_CUDA_DEV_NAME)) != 0) {
Expand All @@ -438,9 +432,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work
return UCS_ERR_IO_ERROR;
}

uct_iface_set_async_event_params(params, &self->async.event_cb,
&self->async.event_arg);

ucs_queue_head_init(&self->active_queue);

for (src = 0; src < UCS_MEMORY_TYPE_LAST; ++src) {
Expand All @@ -463,7 +454,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t)
ucs_queue_head_t *event_q;
ucs_memory_type_t src, dst;

uct_base_iface_progress_disable(&self->super.super,
uct_base_iface_progress_disable(&self->super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&cuda_context));
Expand Down Expand Up @@ -494,7 +485,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t)
ucs_mpool_cleanup(&self->cuda_event_desc, 1);
}

UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_base_iface_t);
UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_cuda_iface_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_cuda_copy_iface_t, uct_iface_t, uct_md_h, uct_worker_h,
const uct_iface_params_t*, const uct_iface_config_t*);
static UCS_CLASS_DEFINE_DELETE_FUNC(uct_cuda_copy_iface_t, uct_iface_t);
Expand Down
Loading

0 comments on commit 348c9dc

Please sign in to comment.