Skip to content

Commit

Permalink
Merge pull request #6364 from Akshay-Venkatesh/topic/v1.10.x-cuda-ipc…
Browse files Browse the repository at this point in the history
…-iface-cache

UCT/CUDA_IPC: make cuda-ipc cache global - v1.10.x
  • Loading branch information
yosefe authored Feb 19, 2021
2 parents 4af10be + 2416b91 commit fdaf339
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 124 deletions.
6 changes: 6 additions & 0 deletions src/tools/perf/lib/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,12 @@ static ucs_status_t ucx_perf_thread_run_test(void* arg)
ucx_perf_params_t* params = &perf->params;
ucs_status_t status;

/* new threads need explicit device association */
status = perf->allocator->init(perf);
if (status != UCS_OK) {
goto out;
}

if (params->warmup_iter > 0) {
ucx_perf_set_warmup(perf, params);
status = ucx_perf_funcs[params->api].run(perf);
Expand Down
157 changes: 131 additions & 26 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,44 @@
#endif

#include "cuda_ipc_cache.h"
#include "cuda_ipc_iface.h"
#include <ucs/debug/log.h>
#include <ucs/debug/memtrack.h>
#include <ucs/profile/profile.h>
#include <ucs/sys/sys.h>
#include <ucs/sys/string.h>
#include <ucs/sys/math.h>
#include <ucs/datastruct/khash.h>


typedef struct uct_cuda_ipc_cache_hash_key {
pid_t pid;
CUdevice cu_device;
} uct_cuda_ipc_cache_hash_key_t;

static UCS_F_ALWAYS_INLINE int
uct_cuda_ipc_cache_hash_equal(uct_cuda_ipc_cache_hash_key_t key1,
uct_cuda_ipc_cache_hash_key_t key2)
{
return (key1.pid == key2.pid) && (key1.cu_device == key2.cu_device);
}

static UCS_F_ALWAYS_INLINE khint32_t
uct_cuda_ipc_cache_hash_func(uct_cuda_ipc_cache_hash_key_t key)
{
return kh_int_hash_func((key.pid << 8) | key.cu_device);
}

KHASH_INIT(cuda_ipc_rem_cache, uct_cuda_ipc_cache_hash_key_t,
uct_cuda_ipc_cache_t*, 1, uct_cuda_ipc_cache_hash_func,
uct_cuda_ipc_cache_hash_equal);

typedef struct uct_cuda_ipc_remote_cache {
khash_t(cuda_ipc_rem_cache) hash;
ucs_recursive_spinlock_t lock;
} uct_cuda_ipc_remote_cache_t;

uct_cuda_ipc_remote_cache_t uct_cuda_ipc_remote_cache;

static ucs_pgt_dir_t *uct_cuda_ipc_cache_pgt_dir_alloc(const ucs_pgtable_t *pgtable)
{
Expand Down Expand Up @@ -48,38 +81,42 @@ static void uct_cuda_ipc_cache_purge(uct_cuda_ipc_cache_t *cache)
{
uct_cuda_ipc_cache_region_t *region, *tmp;
ucs_list_link_t region_list;
int active;

UCT_CUDADRV_CTX_ACTIVE(active);

ucs_list_head_init(&region_list);
ucs_pgtable_purge(&cache->pgtable, uct_cuda_ipc_cache_region_collect_callback,
&region_list);
ucs_list_for_each_safe(region, tmp, &region_list, list) {
UCT_CUDADRV_FUNC_LOG_ERR(
cuIpcCloseMemHandle((CUdeviceptr)region->mapped_addr));
if (active) {
UCT_CUDADRV_FUNC_LOG_ERR(
cuIpcCloseMemHandle((CUdeviceptr)region->mapped_addr));
}
ucs_free(region);
}
ucs_trace("%s: cuda ipc cache purged", cache->name);
}

static ucs_status_t uct_cuda_ipc_open_memhandle(CUipcMemHandle memh,
static ucs_status_t uct_cuda_ipc_open_memhandle(const uct_cuda_ipc_key_t *key,
CUdeviceptr *mapped_addr)
{
const char *cu_err_str;
CUresult cuerr;
ucs_status_t status;

cuerr = cuIpcOpenMemHandle(mapped_addr, memh,
cuerr = cuIpcOpenMemHandle(mapped_addr, key->ph,
CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS);
if (cuerr != CUDA_SUCCESS) {
if (cuerr == CUDA_ERROR_ALREADY_MAPPED) {
return UCS_ERR_ALREADY_EXISTS;
}

if (cuerr == CUDA_SUCCESS) {
status = UCS_OK;
} else {
cuGetErrorString(cuerr, &cu_err_str);
ucs_error("cuIpcOpenMemHandle() failed: %s", cu_err_str);

return UCS_ERR_INVALID_PARAM;
ucs_debug("cuIpcOpenMemHandle() failed: %s", cu_err_str);
status = (cuerr == CUDA_ERROR_ALREADY_MAPPED) ? UCS_ERR_ALREADY_EXISTS :
UCS_ERR_INVALID_PARAM;
}

return UCS_OK;
return status;
}

static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache,
Expand Down Expand Up @@ -108,14 +145,59 @@ static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache,
cache->name, from, to);
}

ucs_status_t uct_cuda_ipc_unmap_memhandle(void *rem_cache, uintptr_t d_bptr,
static ucs_status_t
uct_cuda_ipc_get_remote_cache(pid_t pid, uct_cuda_ipc_cache_t **cache)
{
ucs_status_t status = UCS_OK;
char target_name[64];
uct_cuda_ipc_cache_hash_key_t key;
khiter_t khiter;
int khret;

ucs_recursive_spin_lock(&uct_cuda_ipc_remote_cache.lock);

key.pid = pid;
UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetDevice(&key.cu_device));

khiter = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, key,
&khret);
if ((khret == UCS_KH_PUT_BUCKET_EMPTY) ||
(khret == UCS_KH_PUT_BUCKET_CLEAR)) {
ucs_snprintf_safe(target_name, sizeof(target_name), "dest:%d:%d",
key.pid, key.cu_device);
status = uct_cuda_ipc_create_cache(cache, target_name);
if (status != UCS_OK) {
kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, khiter);
ucs_error("could not create create cuda ipc cache: %s",
ucs_status_string(status));
goto err_unlock;
}

kh_val(&uct_cuda_ipc_remote_cache.hash, khiter) = *cache;
} else if (khret == UCS_KH_PUT_KEY_PRESENT) {
*cache = kh_val(&uct_cuda_ipc_remote_cache.hash, khiter);
} else {
ucs_error("unable to use cuda_ipc remote_cache hash");
status = UCS_ERR_NO_RESOURCE;
}
err_unlock:
ucs_recursive_spin_unlock(&uct_cuda_ipc_remote_cache.lock);
return status;
}

ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr,
void *mapped_addr, int cache_enabled)
{
uct_cuda_ipc_cache_t *cache = (uct_cuda_ipc_cache_t *) rem_cache;
ucs_status_t status = UCS_OK;
ucs_status_t status = UCS_OK;
uct_cuda_ipc_cache_t *cache;
ucs_pgt_region_t *pgt_region;
uct_cuda_ipc_cache_region_t *region;

status = uct_cuda_ipc_get_remote_cache(pid, &cache);
if (status != UCS_OK) {
return status;
}

/* use write lock because cache maybe modified */
pthread_rwlock_wrlock(&cache->lock);
pgt_region = UCS_PROFILE_CALL(ucs_pgtable_lookup, &cache->pgtable, d_bptr);
Expand Down Expand Up @@ -144,16 +226,20 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(void *rem_cache, uintptr_t d_bptr,
return status;
}

UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
(arg, key, mapped_addr),
void *arg, uct_cuda_ipc_key_t *key, void **mapped_addr)
UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle, (key, mapped_addr),
const uct_cuda_ipc_key_t *key, void **mapped_addr)
{
uct_cuda_ipc_cache_t *cache = (uct_cuda_ipc_cache_t *)arg;
uct_cuda_ipc_cache_t *cache;
ucs_status_t status;
ucs_pgt_region_t *pgt_region;
uct_cuda_ipc_cache_region_t *region;
int ret;

status = uct_cuda_ipc_get_remote_cache(key->pid, &cache);
if (status != UCS_OK) {
return status;
}

pthread_rwlock_wrlock(&cache->lock);
pgt_region = UCS_PROFILE_CALL(ucs_pgtable_lookup,
&cache->pgtable, key->d_bptr);
Expand Down Expand Up @@ -191,19 +277,22 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
}
}

status = uct_cuda_ipc_open_memhandle(key->ph, (CUdeviceptr *)mapped_addr);
status = uct_cuda_ipc_open_memhandle(key, (CUdeviceptr*)mapped_addr);
if (ucs_unlikely(status != UCS_OK)) {
if (ucs_likely(status == UCS_ERR_ALREADY_EXISTS)) {
/* unmap all overlapping regions and retry*/
uct_cuda_ipc_cache_invalidate_regions(cache, (void *)key->d_bptr,
UCS_PTR_BYTE_OFFSET(key->d_bptr,
key->b_len));
status = uct_cuda_ipc_open_memhandle(key->ph, (CUdeviceptr *)mapped_addr);
status = uct_cuda_ipc_open_memhandle(key,
(CUdeviceptr*)mapped_addr);
if (ucs_unlikely(status != UCS_OK)) {
if (ucs_likely(status == UCS_ERR_ALREADY_EXISTS)) {
/* unmap all cache entries and retry */
uct_cuda_ipc_cache_purge(cache);
status = uct_cuda_ipc_open_memhandle(key->ph, (CUdeviceptr *)mapped_addr);
status =
uct_cuda_ipc_open_memhandle(key,
(CUdeviceptr*)mapped_addr);
if (status != UCS_OK) {
ucs_fatal("%s: failed to open ipc mem handle. addr:%p "
"len:%lu (%s)", cache->name,
Expand All @@ -216,8 +305,9 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
}
}
} else {
ucs_fatal("%s: failed to open ipc mem handle. addr:%p len:%lu",
ucs_debug("%s: failed to open ipc mem handle. addr:%p len:%lu",
cache->name, (void *)key->d_bptr, key->b_len);
goto err;
}
}

Expand Down Expand Up @@ -262,8 +352,8 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
ucs_trace("%s: cuda_ipc cache new region:"UCS_PGT_REGION_FMT" size:%lu",
cache->name, UCS_PGT_REGION_ARG(&region->super), key->b_len);

pthread_rwlock_unlock(&cache->lock);
return UCS_OK;
status = UCS_OK;

err:
pthread_rwlock_unlock(&cache->lock);
return status;
Expand Down Expand Up @@ -320,3 +410,18 @@ void uct_cuda_ipc_destroy_cache(uct_cuda_ipc_cache_t *cache)
free(cache->name);
ucs_free(cache);
}

UCS_STATIC_INIT {
ucs_recursive_spinlock_init(&uct_cuda_ipc_remote_cache.lock, 0);
kh_init_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash);
}

UCS_STATIC_CLEANUP {
uct_cuda_ipc_cache_t *rem_cache;

kh_foreach_value(&uct_cuda_ipc_remote_cache.hash, rem_cache, {
uct_cuda_ipc_destroy_cache(rem_cache);
})
kh_destroy_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash);
ucs_recursive_spinlock_destroy(&uct_cuda_ipc_remote_cache.lock);
}
16 changes: 8 additions & 8 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@

#include <ucs/datastruct/pgtable.h>
#include <ucs/datastruct/list.h>
#include <ucs/type/init_once.h>
#include <ucs/type/spinlock.h>
#include "cuda_ipc_md.h"
#include <cuda.h>
#include <cuda_runtime.h>


typedef struct uct_cuda_ipc_cache uct_cuda_ipc_cache_t;
typedef struct uct_cuda_ipc_cache_region uct_cuda_ipc_cache_region_t;


typedef struct uct_cuda_ipc_rem_memh uct_cuda_ipc_rem_memh_t;
typedef struct uct_cuda_ipc_cache uct_cuda_ipc_cache_t;
typedef struct uct_cuda_ipc_cache_region uct_cuda_ipc_cache_region_t;
typedef struct uct_cuda_ipc_rem_memh uct_cuda_ipc_rem_memh_t;


struct uct_cuda_ipc_cache_region {
Expand All @@ -44,8 +44,8 @@ ucs_status_t uct_cuda_ipc_create_cache(uct_cuda_ipc_cache_t **cache,
void uct_cuda_ipc_destroy_cache(uct_cuda_ipc_cache_t *cache);


ucs_status_t uct_cuda_ipc_map_memhandle(void *arg, uct_cuda_ipc_key_t *key,
void **mapped_addr);
ucs_status_t uct_cuda_ipc_unmap_memhandle(void *rem_cache, uintptr_t d_bptr,
ucs_status_t
uct_cuda_ipc_map_memhandle(const uct_cuda_ipc_key_t *key, void **mapped_addr);
ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr,
void *mapped_addr, int cache_enabled);
#endif
22 changes: 2 additions & 20 deletions src/uct/cuda/cuda_ipc/cuda_ipc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,15 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_ep_t, const uct_ep_params_t *params)
{
uct_cuda_ipc_iface_t *iface = ucs_derived_of(params->iface,
uct_cuda_ipc_iface_t);
ucs_status_t status;
char target_name[64];

UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);
self->remote_memh_cache = NULL;

/* create a cache by default; disabling implies remove mapping immediately
* after use */
snprintf(target_name, sizeof(target_name), "dest:%d",
*(pid_t*)params->iface_addr);
status = uct_cuda_ipc_create_cache(&self->remote_memh_cache, target_name);
if (status != UCS_OK) {
ucs_error("could not create create cuda ipc cache: %s",
ucs_status_string(status));
return status;
}

return UCS_OK;
}

static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_ep_t)
{
if (self->remote_memh_cache) {
uct_cuda_ipc_destroy_cache(self->remote_memh_cache);
}
}

UCS_CLASS_DEFINE(uct_cuda_ipc_ep_t, uct_base_ep_t)
Expand All @@ -67,7 +50,6 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr,
uct_completion_t *comp, int direction)
{
uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_cuda_ipc_iface_t);
uct_cuda_ipc_ep_t *ep = ucs_derived_of(tl_ep, uct_cuda_ipc_ep_t);
uct_cuda_ipc_key_t *key = (uct_cuda_ipc_key_t *) rkey;
void *mapped_rem_addr;
void *mapped_addr;
Expand All @@ -83,7 +65,7 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr,
return UCS_OK;
}

status = iface->map_memhandle((void *)ep->remote_memh_cache, key, &mapped_addr);
status = uct_cuda_ipc_map_memhandle(key, &mapped_addr);
if (status != UCS_OK) {
return UCS_ERR_IO_ERROR;
}
Expand Down Expand Up @@ -135,8 +117,8 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr,
ucs_queue_push(outstanding_queue, &cuda_ipc_event->queue);
cuda_ipc_event->comp = comp;
cuda_ipc_event->mapped_addr = mapped_addr;
cuda_ipc_event->cache = ep->remote_memh_cache;
cuda_ipc_event->d_bptr = (uintptr_t)key->d_bptr;
cuda_ipc_event->pid = key->pid;
ucs_trace("cuMemcpyDtoDAsync issued :%p dst:%p, src:%p len:%ld",
cuda_ipc_event, (void *) dst, (void *) src, iov[0].length);
return UCS_INPROGRESS;
Expand Down
1 change: 0 additions & 1 deletion src/uct/cuda/cuda_ipc/cuda_ipc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ typedef struct uct_cuda_ipc_ep_addr {

typedef struct uct_cuda_ipc_ep {
uct_base_ep_t super;
uct_cuda_ipc_cache_t *remote_memh_cache;
} uct_cuda_ipc_ep_t;

UCS_CLASS_DECLARE_NEW_FUNC(uct_cuda_ipc_ep_t, uct_ep_t, const uct_ep_params_t *);
Expand Down
Loading

0 comments on commit fdaf339

Please sign in to comment.