diff --git a/src/ucs/sys/rcache.c b/src/ucs/sys/rcache.c index b2a72be75ec..a9b0ae0e795 100644 --- a/src/ucs/sys/rcache.c +++ b/src/ucs/sys/rcache.c @@ -297,7 +297,7 @@ static inline int ucs_rcache_region_test(ucs_rcache_region_t *region, int prot) /* Lock must be held */ static ucs_status_t ucs_rcache_check_overlap(ucs_rcache_t *rcache, ucs_pgt_addr_t *start, - ucs_pgt_addr_t *end, int *prot, + ucs_pgt_addr_t *end, int *prot, int *merged, ucs_rcache_region_t **region_p) { ucs_rcache_region_t *region, *tmp; @@ -370,8 +370,9 @@ ucs_rcache_check_overlap(ucs_rcache_t *rcache, ucs_pgt_addr_t *start, ucs_rcache_region_trace(rcache, region, "merge 0x%lx..0x%lx "UCS_RCACHE_PROT_FMT" with", *start, *end, UCS_RCACHE_PROT_ARG(*prot)); - *start = ucs_min(*start, region->super.start); - *end = ucs_max(*end, region->super.end); + *start = ucs_min(*start, region->super.start); + *end = ucs_max(*end, region->super.end); + *merged = 1; ucs_rcache_region_invalidate(rcache, region, 1, 0); } return UCS_OK; @@ -384,12 +385,14 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length, ucs_rcache_region_t *region = NULL; ucs_pgt_addr_t start, end; ucs_status_t status; + int merged; ucs_trace_func("rcache=%s, address=%p, length=%zu", rcache->name, address, length); pthread_rwlock_wrlock(&rcache->lock); +retry: /* Align to page size */ start = ucs_align_down_pow2((uintptr_t)address, rcache->params.alignment); @@ -397,8 +400,9 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length, rcache->params.alignment); /* Check overlap with existing regions */ + merged = 0; status = UCS_PROFILE_CALL(ucs_rcache_check_overlap, rcache, &start, &end, - &prot, ®ion); + &prot, &merged, ®ion); if (status == UCS_ERR_ALREADY_EXISTS) { /* Found a matching region (it could have been added after we released * the lock) @@ -442,12 +446,19 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length, UCS_PROFILE_NAMED_CALL("mem_reg", rcache->params.ops->mem_reg, rcache->params.context, rcache, arg, region); if (status != UCS_OK) { - /* In case region is not registered, we don't return it to the user, - * so no need to increment reference count. - */ - ucs_rcache_region_debug(rcache, region, "created with status %s", - ucs_status_string(status)); - goto out_unlock; + if (merged) { + /* failure may be due to merge, retry with original address */ + ucs_debug("failed to register merged region " UCS_PGT_REGION_FMT ": %s, retrying", + UCS_PGT_REGION_ARG(®ion->super), ucs_status_string(status)); + status = ucs_pgtable_remove(&rcache->pgtable, ®ion->super); + ucs_assert_always(status == UCS_OK); + ucs_mem_region_destroy_internal(rcache, region); + goto retry; + } else { + ucs_warn("failed to register region " UCS_PGT_REGION_FMT ": %s", + UCS_PGT_REGION_ARG(®ion->super), ucs_status_string(status)); + goto out_unlock; + } } region->flags |= UCS_RCACHE_REGION_FLAG_REGISTERED; diff --git a/src/ucs/sys/rcache.h b/src/ucs/sys/rcache.h index f7dbf163361..c55463a5649 100644 --- a/src/ucs/sys/rcache.h +++ b/src/ucs/sys/rcache.h @@ -50,6 +50,12 @@ struct ucs_rcache_ops { * `region_struct_size' in @ref ucs_rcache_params. * This function may store relevant information (such * as memory keys) inside the larger structure. + * + * @return UCS_OK if registration is successful, error otherwise. + * + * @note This function should be able to handle inaccessible memory addresses + * and return error status in this case, without any destructive consequences + * such as error messages or fatal failure. */ ucs_status_t (*mem_reg)(void *context, ucs_rcache_t *rcache, void *arg, ucs_rcache_region_t *region); diff --git a/src/uct/ib/base/ib_md.c b/src/uct/ib/base/ib_md.c index fd7b6cd8ed6..7b55e3673e0 100644 --- a/src/uct/ib/base/ib_md.c +++ b/src/uct/ib/base/ib_md.c @@ -325,8 +325,9 @@ uint8_t uct_ib_md_get_atomic_mr_id(uct_ib_md_t *md) static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address, size_t length, uint64_t exp_access, - struct ibv_mr **mr_p) + int silent, struct ibv_mr **mr_p) { + ucs_log_level_t level = silent ? UCS_LOG_LEVEL_DEBUG : UCS_LOG_LEVEL_ERROR; struct ibv_mr *mr; if (exp_access) { @@ -341,8 +342,8 @@ static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address, mr = UCS_PROFILE_CALL(ibv_exp_reg_mr, &in); if (mr == NULL) { - ucs_error("ibv_exp_reg_mr(address=%p, length=%Zu, exp_access=0x%lx) failed: %m", - in.addr, in.length, in.exp_access); + ucs_log(level, "ibv_exp_reg_mr(address=%p, length=%Zu, exp_access=0x%lx) failed: %m", + in.addr, in.length, in.exp_access); return UCS_ERR_IO_ERROR; } #else @@ -352,7 +353,7 @@ static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address, mr = UCS_PROFILE_CALL(ibv_reg_mr, md->pd, address, length, UCT_IB_MEM_ACCESS_FLAGS); if (mr == NULL) { - ucs_error("ibv_reg_mr(address=%p, length=%Zu, access=0x%x) failed: %m", + ucs_log(level, "ibv_reg_mr(address=%p, length=%Zu, access=0x%x) failed: %m", address, length, UCT_IB_MEM_ACCESS_FLAGS); return UCS_ERR_IO_ERROR; } @@ -677,7 +678,7 @@ static ucs_status_t uct_ib_mem_alloc(uct_md_h uct_md, size_t *length_p, length = ucs_memtrack_adjust_alloc_size(*length_p); exp_access = uct_ib_md_access_flags(md, flags, length) | IBV_EXP_ACCESS_ALLOCATE_MR; - status = uct_ib_md_reg_mr(md, NULL, length, exp_access, &memh->mr); + status = uct_ib_md_reg_mr(md, NULL, length, exp_access, 0, &memh->mr); if (status != UCS_OK) { goto err_free_memh; } @@ -727,14 +728,14 @@ static ucs_status_t uct_ib_mem_free(uct_md_h md, uct_mem_h memh) static ucs_status_t uct_ib_mem_reg_internal(uct_md_h uct_md, void *address, size_t length, unsigned flags, - uct_ib_mem_t *memh) + int silent, uct_ib_mem_t *memh) { uct_ib_md_t *md = ucs_derived_of(uct_md, uct_ib_md_t); ucs_status_t status; uint64_t exp_access; exp_access = uct_ib_md_access_flags(md, flags, length); - status = uct_ib_md_reg_mr(md, address, length, exp_access, &memh->mr); + status = uct_ib_md_reg_mr(md, address, length, exp_access, silent, &memh->mr); if (status != UCS_OK) { return status; } @@ -764,7 +765,7 @@ static ucs_status_t uct_ib_mem_reg(uct_md_h uct_md, void *address, size_t length return UCS_ERR_NO_MEMORY; } - status = uct_ib_mem_reg_internal(uct_md, address, length, flags, memh); + status = uct_ib_mem_reg_internal(uct_md, address, length, flags, 0, memh); if (status != UCS_OK) { uct_ib_memh_free(memh); return status; @@ -916,7 +917,7 @@ static ucs_status_t uct_ib_rcache_mem_reg_cb(void *context, ucs_rcache_t *rcache status = uct_ib_mem_reg_internal(&md->super, (void*)region->super.super.start, region->super.super.end - region->super.super.start, - *flags, ®ion->memh); + *flags, 1, ®ion->memh); if (status != UCS_OK) { return status; } diff --git a/test/gtest/ucs/test_rcache.cc b/test/gtest/ucs/test_rcache.cc index 98ba99554fa..3484d70b3b6 100644 --- a/test/gtest/ucs/test_rcache.cc +++ b/test/gtest/ucs/test_rcache.cc @@ -460,17 +460,37 @@ class test_rcache_no_register : public test_rcache { virtual ucs_status_t mem_reg(region *region) { return UCS_ERR_IO_ERROR; } + + static ucs_log_func_rc_t + log_handler(const char *file, unsigned line, const char *function, + ucs_log_level_t level, const char *prefix, const char *message, + va_list ap) + { + /* Ignore warnings about empty memory pool */ + if ((level == UCS_LOG_LEVEL_WARN) && strstr(message, "failed to register")) { + char buf[ucs_global_opts.log_buffer_size]; + vsnprintf(buf, sizeof(buf), message, ap); + UCS_TEST_MESSAGE << "" << buf; + return UCS_LOG_FUNC_RC_STOP; + } + + return UCS_LOG_FUNC_RC_CONTINUE; + } }; UCS_MT_TEST_F(test_rcache_no_register, register_failure, 10) { static const size_t size = 1 * 1024 * 1024; void *ptr = malloc(size); + ucs_log_push_handler(log_handler); + ucs_status_t status; ucs_rcache_region_t *r; status = ucs_rcache_get(m_rcache, ptr, size, PROT_READ|PROT_WRITE, NULL, &r); EXPECT_EQ(UCS_ERR_IO_ERROR, status); EXPECT_EQ(0u, m_reg_count); + ucs_log_pop_handler(); + free(ptr); } diff --git a/test/gtest/uct/test_pd.cc b/test/gtest/uct/test_pd.cc index 98e90966297..78771d42021 100644 --- a/test/gtest/uct/test_pd.cc +++ b/test/gtest/uct/test_pd.cc @@ -32,6 +32,21 @@ class test_pd : public testing::TestWithParam, return m_pd; } + static void* alloc_thread(void *arg) + { + volatile int *stop_flag = (int*)arg; + + while (!*stop_flag) { + int count = rand() % 100; + std::vector buffers; + for (int i = 0; i < count; ++i) { + buffers.push_back(malloc(rand() % (256*1024))); + } + std::for_each(buffers.begin(), buffers.end(), free); + } + return NULL; + } + private: ucs::handle m_md_config; ucs::handle m_pd; @@ -258,6 +273,43 @@ UCS_TEST_P(test_pd, alloc_advise) { uct_md_mem_free(pd(), memh); } +/* + * reproduce issue #1284, main thread is registering memory while another thread + * allocates and releases memory. + */ +UCS_TEST_P(test_pd, reg_multi_thread) { + ucs_status_t status; + + check_caps(UCT_MD_FLAG_REG, "registration"); + + pthread_t thread_id; + int stop_flag = 0; + pthread_create(&thread_id, NULL, alloc_thread, &stop_flag); + + ucs_time_t start_time = ucs_get_time(); + while (ucs_get_time() - start_time < ucs_time_from_sec(0.5)) { + const size_t size = (rand() % 65536) + 1; + + void *buffer = malloc(size); + ASSERT_TRUE(buffer != NULL); + + uct_mem_h memh; + status = uct_md_mem_reg(pd(), buffer, size, UCT_MD_MEM_FLAG_NONBLOCK, &memh); + ASSERT_UCS_OK(status, << " buffer=" << buffer << " size=" << size); + ASSERT_TRUE(memh != UCT_MEM_HANDLE_NULL); + + sched_yield(); + + status = uct_md_mem_dereg(pd(), memh); + EXPECT_UCS_OK(status); + free(buffer); + } + + stop_flag = 1; + pthread_join(thread_id, NULL); +} + + #define UCT_PD_INSTANTIATE_TEST_CASE(_test_case) \ UCS_PP_FOREACH(_UCT_PD_INSTANTIATE_TEST_CASE, _test_case, \ knem, \