diff --git a/src/tools/perf/lib/libperf.c b/src/tools/perf/lib/libperf.c index 71196f0b54d..c97440707b8 100644 --- a/src/tools/perf/lib/libperf.c +++ b/src/tools/perf/lib/libperf.c @@ -1866,6 +1866,12 @@ static ucs_status_t ucx_perf_thread_run_test(void* arg) ucx_perf_params_t* params = &perf->params; ucs_status_t status; + /* new threads need explicit device association */ + status = perf->allocator->init(perf); + if (status != UCS_OK) { + goto out; + } + if (params->warmup_iter > 0) { ucx_perf_set_warmup(perf, params); status = ucx_perf_funcs[params->api].run(perf); diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_cache.c b/src/uct/cuda/cuda_ipc/cuda_ipc_cache.c index d6151d52edc..a2afd15eabb 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_cache.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_cache.c @@ -9,11 +9,44 @@ #endif #include "cuda_ipc_cache.h" +#include "cuda_ipc_iface.h" #include #include #include #include +#include #include +#include + + +typedef struct uct_cuda_ipc_cache_hash_key { + pid_t pid; + CUdevice cu_device; +} uct_cuda_ipc_cache_hash_key_t; + +static UCS_F_ALWAYS_INLINE int +uct_cuda_ipc_cache_hash_equal(uct_cuda_ipc_cache_hash_key_t key1, + uct_cuda_ipc_cache_hash_key_t key2) +{ + return (key1.pid == key2.pid) && (key1.cu_device == key2.cu_device); +} + +static UCS_F_ALWAYS_INLINE khint32_t +uct_cuda_ipc_cache_hash_func(uct_cuda_ipc_cache_hash_key_t key) +{ + return kh_int_hash_func((key.pid << 8) | key.cu_device); +} + +KHASH_INIT(cuda_ipc_rem_cache, uct_cuda_ipc_cache_hash_key_t, + uct_cuda_ipc_cache_t*, 1, uct_cuda_ipc_cache_hash_func, + uct_cuda_ipc_cache_hash_equal); + +typedef struct uct_cuda_ipc_remote_cache { + khash_t(cuda_ipc_rem_cache) hash; + ucs_recursive_spinlock_t lock; +} uct_cuda_ipc_remote_cache_t; + +uct_cuda_ipc_remote_cache_t uct_cuda_ipc_remote_cache; static ucs_pgt_dir_t *uct_cuda_ipc_cache_pgt_dir_alloc(const ucs_pgtable_t *pgtable) { @@ -48,38 +81,42 @@ static void uct_cuda_ipc_cache_purge(uct_cuda_ipc_cache_t *cache) { uct_cuda_ipc_cache_region_t *region, *tmp; ucs_list_link_t region_list; + int active; + + UCT_CUDADRV_CTX_ACTIVE(active); ucs_list_head_init(®ion_list); ucs_pgtable_purge(&cache->pgtable, uct_cuda_ipc_cache_region_collect_callback, ®ion_list); ucs_list_for_each_safe(region, tmp, ®ion_list, list) { - UCT_CUDADRV_FUNC_LOG_ERR( - cuIpcCloseMemHandle((CUdeviceptr)region->mapped_addr)); + if (active) { + UCT_CUDADRV_FUNC_LOG_ERR( + cuIpcCloseMemHandle((CUdeviceptr)region->mapped_addr)); + } ucs_free(region); } ucs_trace("%s: cuda ipc cache purged", cache->name); } -static ucs_status_t uct_cuda_ipc_open_memhandle(CUipcMemHandle memh, +static ucs_status_t uct_cuda_ipc_open_memhandle(const uct_cuda_ipc_key_t *key, CUdeviceptr *mapped_addr) { const char *cu_err_str; CUresult cuerr; + ucs_status_t status; - cuerr = cuIpcOpenMemHandle(mapped_addr, memh, + cuerr = cuIpcOpenMemHandle(mapped_addr, key->ph, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS); - if (cuerr != CUDA_SUCCESS) { - if (cuerr == CUDA_ERROR_ALREADY_MAPPED) { - return UCS_ERR_ALREADY_EXISTS; - } - + if (cuerr == CUDA_SUCCESS) { + status = UCS_OK; + } else { cuGetErrorString(cuerr, &cu_err_str); - ucs_error("cuIpcOpenMemHandle() failed: %s", cu_err_str); - - return UCS_ERR_INVALID_PARAM; + ucs_debug("cuIpcOpenMemHandle() failed: %s", cu_err_str); + status = (cuerr == CUDA_ERROR_ALREADY_MAPPED) ? UCS_ERR_ALREADY_EXISTS : + UCS_ERR_INVALID_PARAM; } - return UCS_OK; + return status; } static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache, @@ -108,14 +145,59 @@ static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache, cache->name, from, to); } -ucs_status_t uct_cuda_ipc_unmap_memhandle(void *rem_cache, uintptr_t d_bptr, +static ucs_status_t +uct_cuda_ipc_get_remote_cache(pid_t pid, uct_cuda_ipc_cache_t **cache) +{ + ucs_status_t status = UCS_OK; + char target_name[64]; + uct_cuda_ipc_cache_hash_key_t key; + khiter_t khiter; + int khret; + + ucs_recursive_spin_lock(&uct_cuda_ipc_remote_cache.lock); + + key.pid = pid; + UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetDevice(&key.cu_device)); + + khiter = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, key, + &khret); + if ((khret == UCS_KH_PUT_BUCKET_EMPTY) || + (khret == UCS_KH_PUT_BUCKET_CLEAR)) { + ucs_snprintf_safe(target_name, sizeof(target_name), "dest:%d:%d", + key.pid, key.cu_device); + status = uct_cuda_ipc_create_cache(cache, target_name); + if (status != UCS_OK) { + kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, khiter); + ucs_error("could not create create cuda ipc cache: %s", + ucs_status_string(status)); + goto err_unlock; + } + + kh_val(&uct_cuda_ipc_remote_cache.hash, khiter) = *cache; + } else if (khret == UCS_KH_PUT_KEY_PRESENT) { + *cache = kh_val(&uct_cuda_ipc_remote_cache.hash, khiter); + } else { + ucs_error("unable to use cuda_ipc remote_cache hash"); + status = UCS_ERR_NO_RESOURCE; + } +err_unlock: + ucs_recursive_spin_unlock(&uct_cuda_ipc_remote_cache.lock); + return status; +} + +ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr, void *mapped_addr, int cache_enabled) { - uct_cuda_ipc_cache_t *cache = (uct_cuda_ipc_cache_t *) rem_cache; - ucs_status_t status = UCS_OK; + ucs_status_t status = UCS_OK; + uct_cuda_ipc_cache_t *cache; ucs_pgt_region_t *pgt_region; uct_cuda_ipc_cache_region_t *region; + status = uct_cuda_ipc_get_remote_cache(pid, &cache); + if (status != UCS_OK) { + return status; + } + /* use write lock because cache maybe modified */ pthread_rwlock_wrlock(&cache->lock); pgt_region = UCS_PROFILE_CALL(ucs_pgtable_lookup, &cache->pgtable, d_bptr); @@ -144,16 +226,20 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(void *rem_cache, uintptr_t d_bptr, return status; } -UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle, - (arg, key, mapped_addr), - void *arg, uct_cuda_ipc_key_t *key, void **mapped_addr) +UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle, (key, mapped_addr), + const uct_cuda_ipc_key_t *key, void **mapped_addr) { - uct_cuda_ipc_cache_t *cache = (uct_cuda_ipc_cache_t *)arg; + uct_cuda_ipc_cache_t *cache; ucs_status_t status; ucs_pgt_region_t *pgt_region; uct_cuda_ipc_cache_region_t *region; int ret; + status = uct_cuda_ipc_get_remote_cache(key->pid, &cache); + if (status != UCS_OK) { + return status; + } + pthread_rwlock_wrlock(&cache->lock); pgt_region = UCS_PROFILE_CALL(ucs_pgtable_lookup, &cache->pgtable, key->d_bptr); @@ -191,19 +277,22 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle, } } - status = uct_cuda_ipc_open_memhandle(key->ph, (CUdeviceptr *)mapped_addr); + status = uct_cuda_ipc_open_memhandle(key, (CUdeviceptr*)mapped_addr); if (ucs_unlikely(status != UCS_OK)) { if (ucs_likely(status == UCS_ERR_ALREADY_EXISTS)) { /* unmap all overlapping regions and retry*/ uct_cuda_ipc_cache_invalidate_regions(cache, (void *)key->d_bptr, UCS_PTR_BYTE_OFFSET(key->d_bptr, key->b_len)); - status = uct_cuda_ipc_open_memhandle(key->ph, (CUdeviceptr *)mapped_addr); + status = uct_cuda_ipc_open_memhandle(key, + (CUdeviceptr*)mapped_addr); if (ucs_unlikely(status != UCS_OK)) { if (ucs_likely(status == UCS_ERR_ALREADY_EXISTS)) { /* unmap all cache entries and retry */ uct_cuda_ipc_cache_purge(cache); - status = uct_cuda_ipc_open_memhandle(key->ph, (CUdeviceptr *)mapped_addr); + status = + uct_cuda_ipc_open_memhandle(key, + (CUdeviceptr*)mapped_addr); if (status != UCS_OK) { ucs_fatal("%s: failed to open ipc mem handle. addr:%p " "len:%lu (%s)", cache->name, @@ -216,8 +305,9 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle, } } } else { - ucs_fatal("%s: failed to open ipc mem handle. addr:%p len:%lu", + ucs_debug("%s: failed to open ipc mem handle. addr:%p len:%lu", cache->name, (void *)key->d_bptr, key->b_len); + goto err; } } @@ -262,8 +352,8 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle, ucs_trace("%s: cuda_ipc cache new region:"UCS_PGT_REGION_FMT" size:%lu", cache->name, UCS_PGT_REGION_ARG(®ion->super), key->b_len); - pthread_rwlock_unlock(&cache->lock); - return UCS_OK; + status = UCS_OK; + err: pthread_rwlock_unlock(&cache->lock); return status; @@ -320,3 +410,18 @@ void uct_cuda_ipc_destroy_cache(uct_cuda_ipc_cache_t *cache) free(cache->name); ucs_free(cache); } + +UCS_STATIC_INIT { + ucs_recursive_spinlock_init(&uct_cuda_ipc_remote_cache.lock, 0); + kh_init_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash); +} + +UCS_STATIC_CLEANUP { + uct_cuda_ipc_cache_t *rem_cache; + + kh_foreach_value(&uct_cuda_ipc_remote_cache.hash, rem_cache, { + uct_cuda_ipc_destroy_cache(rem_cache); + }) + kh_destroy_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash); + ucs_recursive_spinlock_destroy(&uct_cuda_ipc_remote_cache.lock); +} diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_cache.h b/src/uct/cuda/cuda_ipc/cuda_ipc_cache.h index 588f5c97c11..d3a948f6288 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_cache.h +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_cache.h @@ -9,16 +9,16 @@ #include #include +#include +#include #include "cuda_ipc_md.h" #include #include -typedef struct uct_cuda_ipc_cache uct_cuda_ipc_cache_t; -typedef struct uct_cuda_ipc_cache_region uct_cuda_ipc_cache_region_t; - - -typedef struct uct_cuda_ipc_rem_memh uct_cuda_ipc_rem_memh_t; +typedef struct uct_cuda_ipc_cache uct_cuda_ipc_cache_t; +typedef struct uct_cuda_ipc_cache_region uct_cuda_ipc_cache_region_t; +typedef struct uct_cuda_ipc_rem_memh uct_cuda_ipc_rem_memh_t; struct uct_cuda_ipc_cache_region { @@ -44,8 +44,8 @@ ucs_status_t uct_cuda_ipc_create_cache(uct_cuda_ipc_cache_t **cache, void uct_cuda_ipc_destroy_cache(uct_cuda_ipc_cache_t *cache); -ucs_status_t uct_cuda_ipc_map_memhandle(void *arg, uct_cuda_ipc_key_t *key, - void **mapped_addr); -ucs_status_t uct_cuda_ipc_unmap_memhandle(void *rem_cache, uintptr_t d_bptr, +ucs_status_t +uct_cuda_ipc_map_memhandle(const uct_cuda_ipc_key_t *key, void **mapped_addr); +ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr, void *mapped_addr, int cache_enabled); #endif diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c b/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c index 886ffbcb4ca..e67c4ce1c69 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_ep.c @@ -26,32 +26,15 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_ep_t, const uct_ep_params_t *params) { uct_cuda_ipc_iface_t *iface = ucs_derived_of(params->iface, uct_cuda_ipc_iface_t); - ucs_status_t status; - char target_name[64]; UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params); UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super); - self->remote_memh_cache = NULL; - - /* create a cache by default; disabling implies remove mapping immediately - * after use */ - snprintf(target_name, sizeof(target_name), "dest:%d", - *(pid_t*)params->iface_addr); - status = uct_cuda_ipc_create_cache(&self->remote_memh_cache, target_name); - if (status != UCS_OK) { - ucs_error("could not create create cuda ipc cache: %s", - ucs_status_string(status)); - return status; - } return UCS_OK; } static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_ep_t) { - if (self->remote_memh_cache) { - uct_cuda_ipc_destroy_cache(self->remote_memh_cache); - } } UCS_CLASS_DEFINE(uct_cuda_ipc_ep_t, uct_base_ep_t) @@ -67,7 +50,6 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr, uct_completion_t *comp, int direction) { uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_cuda_ipc_iface_t); - uct_cuda_ipc_ep_t *ep = ucs_derived_of(tl_ep, uct_cuda_ipc_ep_t); uct_cuda_ipc_key_t *key = (uct_cuda_ipc_key_t *) rkey; void *mapped_rem_addr; void *mapped_addr; @@ -83,7 +65,7 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr, return UCS_OK; } - status = iface->map_memhandle((void *)ep->remote_memh_cache, key, &mapped_addr); + status = uct_cuda_ipc_map_memhandle(key, &mapped_addr); if (status != UCS_OK) { return UCS_ERR_IO_ERROR; } @@ -135,8 +117,8 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr, ucs_queue_push(outstanding_queue, &cuda_ipc_event->queue); cuda_ipc_event->comp = comp; cuda_ipc_event->mapped_addr = mapped_addr; - cuda_ipc_event->cache = ep->remote_memh_cache; cuda_ipc_event->d_bptr = (uintptr_t)key->d_bptr; + cuda_ipc_event->pid = key->pid; ucs_trace("cuMemcpyDtoDAsync issued :%p dst:%p, src:%p len:%ld", cuda_ipc_event, (void *) dst, (void *) src, iov[0].length); return UCS_INPROGRESS; diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_ep.h b/src/uct/cuda/cuda_ipc/cuda_ipc_ep.h index 4be71d28f73..edae452b09f 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_ep.h +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_ep.h @@ -18,7 +18,6 @@ typedef struct uct_cuda_ipc_ep_addr { typedef struct uct_cuda_ipc_ep { uct_base_ep_t super; - uct_cuda_ipc_cache_t *remote_memh_cache; } uct_cuda_ipc_ep_t; UCS_CLASS_DECLARE_NEW_FUNC(uct_cuda_ipc_ep_t, uct_ep_t, const uct_ep_params_t *); diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c index b56a8dab5cc..19d01f4a2f2 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c @@ -17,6 +17,7 @@ #include #include #include +#include static ucs_config_field_t uct_cuda_ipc_iface_config_table[] = { @@ -251,10 +252,10 @@ uct_cuda_ipc_progress_event_q(uct_cuda_ipc_iface_t *iface, uct_invoke_completion(cuda_ipc_event->comp, UCS_OK); } - status = iface->unmap_memhandle(cuda_ipc_event->cache, - cuda_ipc_event->d_bptr, - cuda_ipc_event->mapped_addr, - iface->config.enable_cache); + status = uct_cuda_ipc_unmap_memhandle(cuda_ipc_event->pid, + cuda_ipc_event->d_bptr, + cuda_ipc_event->mapped_addr, + iface->config.enable_cache); if (status != UCS_OK) { ucs_fatal("failed to unmap addr:%p", cuda_ipc_event->mapped_addr); } @@ -428,9 +429,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke self->config.enable_cache = config->enable_cache; self->config.max_cuda_ipc_events = config->max_cuda_ipc_events; - self->map_memhandle = uct_cuda_ipc_map_memhandle; - self->unmap_memhandle = uct_cuda_ipc_unmap_memhandle; - status = ucs_mpool_init(&self->event_desc, 0, sizeof(uct_cuda_ipc_event_desc_t), diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h index 42e135a92e1..5eb99791306 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h @@ -18,7 +18,6 @@ #define UCT_CUDA_IPC_MAX_PEERS 16 - typedef struct uct_cuda_ipc_iface { uct_base_iface_t super; ucs_mpool_t event_desc; /* cuda event desc */ @@ -35,10 +34,6 @@ typedef struct uct_cuda_ipc_iface { unsigned max_cuda_ipc_events; /* max mpool entries */ int enable_cache; /* enable/disable ipc handle cache */ } config; - ucs_status_t (*map_memhandle)(void *context, uct_cuda_ipc_key_t *key, - void **map_addr); - ucs_status_t (*unmap_memhandle)(void *rem_cache, uintptr_t d_bptr, - void *mapped_addr, int cache_enabled); } uct_cuda_ipc_iface_t; @@ -58,8 +53,8 @@ typedef struct uct_cuda_ipc_event_desc { uct_completion_t *comp; ucs_queue_elem_t queue; uct_cuda_ipc_ep_t *ep; - void *cache; uintptr_t d_bptr; + pid_t pid; } uct_cuda_ipc_event_desc_t; diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_md.c b/src/uct/cuda/cuda_ipc/cuda_ipc_md.c index b8cc33d5a5d..a8b090a6b85 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_md.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_md.c @@ -9,6 +9,7 @@ #endif #include "cuda_ipc_md.h" +#include "cuda_ipc_cache.h" #include #include @@ -75,6 +76,9 @@ ucs_status_t uct_cuda_ipc_get_unique_index_for_uuid(int* idx, uct_cuda_ipc_key_t *rkey) { int i; + int num_devices; + int original_capacity, new_capacity; + int original_count, new_count; for (i = 0; i < md->uuid_map_size; i++) { if (uct_cuda_ipc_uuid_equals(&rkey->uuid, &md->uuid_map[i])) { @@ -85,13 +89,12 @@ ucs_status_t uct_cuda_ipc_get_unique_index_for_uuid(int* idx, if (ucs_unlikely(md->uuid_map_size == md->uuid_map_capacity)) { /* reallocate on demand */ - int num_devices; - int original_cache_size, new_cache_size; - int new_capacity = md->uuid_map_capacity * 2; - UCT_CUDA_IPC_DEVICE_GET_COUNT(num_devices); - original_cache_size = md->uuid_map_capacity * num_devices; - new_cache_size = new_capacity * num_devices; + original_capacity = md->uuid_map_capacity; + new_capacity = md->uuid_map_capacity ? + (md->uuid_map_capacity * 2) : 16; + original_count = original_capacity * num_devices; + new_count = new_capacity * num_devices; md->uuid_map_capacity = new_capacity; md->uuid_map = ucs_realloc(md->uuid_map, new_capacity * sizeof(CUuuid), @@ -101,14 +104,16 @@ ucs_status_t uct_cuda_ipc_get_unique_index_for_uuid(int* idx, } md->peer_accessible_cache = ucs_realloc(md->peer_accessible_cache, - new_cache_size, + new_count * + sizeof(ucs_ternary_auto_value_t), "uct_cuda_ipc_peer_accessible_cache"); if (md->peer_accessible_cache == NULL) { return UCS_ERR_NO_MEMORY; } - memset(md->peer_accessible_cache + original_cache_size, 0xFF, - new_cache_size - original_cache_size); + for (i = original_count; i < new_count; i++) { + md->peer_accessible_cache[i] = UCS_TRY; + } } /* Add new mapping */ @@ -125,12 +130,12 @@ static ucs_status_t uct_cuda_ipc_is_peer_accessible(uct_cuda_ipc_component_t *md ucs_status_t status; int peer_idx; int num_devices; - char* accessible; - CUdeviceptr d_mapped; + ucs_ternary_auto_value_t *accessible; + void *d_mapped; status = uct_cuda_ipc_get_unique_index_for_uuid(&peer_idx, mdc->md, rkey); if (ucs_unlikely(status != UCS_OK)) { - return status; + goto err; } /* overwrite dev_num with a unique ID; this means that relative remote @@ -138,23 +143,40 @@ static ucs_status_t uct_cuda_ipc_is_peer_accessible(uct_cuda_ipc_component_t *md * stream sequentialization */ rkey->dev_num = peer_idx; - UCT_CUDA_IPC_GET_DEVICE(this_device); - UCT_CUDA_IPC_DEVICE_GET_COUNT(num_devices); + if ((CUDA_SUCCESS != cuCtxGetDevice(&this_device)) || + (CUDA_SUCCESS != cuDeviceGetCount(&num_devices))) { + goto err; + } accessible = &mdc->md->peer_accessible_cache[peer_idx * num_devices + this_device]; - if (*accessible == (char)0xFF) { /* unchecked, add to cache */ - CUresult result = cuIpcOpenMemHandle(&d_mapped, - rkey->ph, - CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS); - *accessible = ((result != CUDA_SUCCESS) && (result != CUDA_ERROR_ALREADY_MAPPED)) - ? 0 : 1; - if (result == CUDA_SUCCESS) { - result = cuIpcCloseMemHandle(d_mapped); - if (result != CUDA_SUCCESS) ucs_fatal("Unable to close memhandle"); - } + if (*accessible == UCS_TRY) { /* unchecked, add to cache */ + + /* Check if peer is reachable by trying to open memory handle. This is + * necessary when the device is not visible through CUDA_VISIBLE_DEVICES + * and checking peer accessibility through CUDA driver API is not + * possible. + * Previously, reachability was checked by opening a memory handle + * and immediately closing it as the handle to memory handle cache + * was not not globally visible. Doing this with multiple threads is an + * issue as a thread may first check reachability, and later open the + * handle, and save mapped pointer in cache as part of a put/get + * operation. At this point another thread can then close the same + * memory handle as part of reachability check. This leads to a + * cuMemcpyAsync error when accessing the mapped pointer as part of + * put/get operation. + * Now, we immediately insert into cache to save on calling + * OpenMemHandle for the same handle because the cache is globally + * accessible using rkey->pid. */ + status = uct_cuda_ipc_map_memhandle(rkey, &d_mapped); + + *accessible = ((status == UCS_OK) || (status == UCS_ERR_ALREADY_EXISTS)) + ? UCS_YES : UCS_NO; } - return (*accessible == 1) ? UCS_OK : UCS_ERR_UNREACHABLE; + return (*accessible == UCS_YES) ? UCS_OK : UCS_ERR_UNREACHABLE; + +err: + return status; } UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_rkey_unpack, @@ -207,6 +229,7 @@ uct_cuda_ipc_mem_reg_internal(uct_md_h uct_md, void *addr, size_t length, log_level = (flags & UCT_MD_MEM_FLAG_HIDE_ERRORS) ? UCS_LOG_LEVEL_DEBUG : UCS_LOG_LEVEL_ERROR; + status = UCT_CUDADRV_FUNC(cuIpcGetMemHandle(&key->ph, (CUdeviceptr)addr), log_level); if (UCS_OK != status) { @@ -220,6 +243,7 @@ uct_cuda_ipc_mem_reg_internal(uct_md_h uct_md, void *addr, size_t length, log_level); key->dev_num = (int) cu_device; + key->pid = getpid(); ucs_trace("registered memory:%p..%p length:%lu dev_num:%d", addr, UCS_PTR_BYTE_OFFSET(addr, length), length, (int) cu_device); return UCS_OK; @@ -280,7 +304,6 @@ uct_cuda_ipc_md_open(uct_component_t *component, const char *md_name, uct_cuda_ipc_md_t* md; uct_cuda_ipc_component_t* com; - UCS_STATIC_ASSERT(sizeof(md->peer_accessible_cache[0]) == sizeof(char)); UCT_CUDA_IPC_DEVICE_GET_COUNT(num_devices); md = ucs_calloc(1, sizeof(uct_cuda_ipc_md_t), "uct_cuda_ipc_md"); @@ -292,26 +315,10 @@ uct_cuda_ipc_md_open(uct_component_t *component, const char *md_name, md->super.component = &uct_cuda_ipc_component.super; /* allocate uuid map and peer accessible cache */ - md->uuid_map_size = 0; - md->uuid_map_capacity = 16; - md->uuid_map = ucs_malloc(md->uuid_map_capacity * sizeof(CUuuid), - "uct_cuda_ipc_uuid_map"); - if (md->uuid_map == NULL) { - free(md); - return UCS_ERR_NO_MEMORY; - } - - /* Initially support caching accessibility of up to 16 other peers */ - md->peer_accessible_cache = ucs_malloc(num_devices * md->uuid_map_capacity, - "uct_cuda_ipc_peer_accessible_cache"); - if (md->peer_accessible_cache == NULL) { - free(md->uuid_map); - free(md); - return UCS_ERR_NO_MEMORY; - } - - /* 0xFF = !cached, 1 = accessible, 0 = !accessible */ - memset(md->peer_accessible_cache, 0xFF, num_devices * md->uuid_map_capacity); + md->uuid_map_size = 0; + md->uuid_map_capacity = 0; + md->uuid_map = NULL; + md->peer_accessible_cache = NULL; com = ucs_derived_of(md->super.component, uct_cuda_ipc_component_t); com->md = md; diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_md.h b/src/uct/cuda/cuda_ipc/cuda_ipc_md.h index 5e0ef493867..ab2ea3b02b6 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_md.h +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_md.h @@ -10,17 +10,19 @@ #include #include #include +#include +#include /** * @brief cuda ipc MD descriptor */ typedef struct uct_cuda_ipc_md { - struct uct_md super; /**< Domain info */ - CUuuid* uuid_map; - char* peer_accessible_cache; - int uuid_map_size; - int uuid_map_capacity; + struct uct_md super; /**< Domain info */ + CUuuid* uuid_map; + ucs_ternary_auto_value_t *peer_accessible_cache; + int uuid_map_size; + int uuid_map_capacity; } uct_cuda_ipc_md_t; /** @@ -45,11 +47,12 @@ typedef struct uct_cuda_ipc_md_config { * @brief cuda_ipc packed and remote key for put/get */ typedef struct uct_cuda_ipc_key { - CUipcMemHandle ph; /* Memory handle of GPU memory */ - CUdeviceptr d_bptr; /* Allocation base address */ - size_t b_len; /* Allocation size */ - int dev_num; /* GPU Device number */ - CUuuid uuid; /* GPU Device UUID */ + CUipcMemHandle ph; /* Memory handle of GPU memory */ + pid_t pid; /* PID as key to resolve peer_map hash */ + CUdeviceptr d_bptr; /* Allocation base address */ + size_t b_len; /* Allocation size */ + int dev_num; /* GPU Device number */ + CUuuid uuid; /* GPU Device UUID */ } uct_cuda_ipc_key_t;