From 778d47b0a10eb5a793616bc2e672350476f755f1 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Sat, 6 May 2017 00:50:41 +0300 Subject: [PATCH] UCS/RCACHE: Fix rcache merge while mappings are modified. When merging with existing memory regions, it may already be released by another thread, or even triggered by releasing some on rcache internal structures. It's not always possible to detect this release on time before attempting to register the region. Therefore, let the memory registration callback report the error, and retry the registration with the original address range (which has to be valid). --- src/ucs/sys/rcache.c | 31 ++++++++++++++------- src/ucs/sys/rcache.h | 6 ++++ src/uct/ib/base/ib_md.c | 19 +++++++------ test/gtest/ucs/test_rcache.cc | 20 ++++++++++++++ test/gtest/uct/test_pd.cc | 52 +++++++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 19 deletions(-) diff --git a/src/ucs/sys/rcache.c b/src/ucs/sys/rcache.c index b2a72be75ec..a9b0ae0e795 100644 --- a/src/ucs/sys/rcache.c +++ b/src/ucs/sys/rcache.c @@ -297,7 +297,7 @@ static inline int ucs_rcache_region_test(ucs_rcache_region_t *region, int prot) /* Lock must be held */ static ucs_status_t ucs_rcache_check_overlap(ucs_rcache_t *rcache, ucs_pgt_addr_t *start, - ucs_pgt_addr_t *end, int *prot, + ucs_pgt_addr_t *end, int *prot, int *merged, ucs_rcache_region_t **region_p) { ucs_rcache_region_t *region, *tmp; @@ -370,8 +370,9 @@ ucs_rcache_check_overlap(ucs_rcache_t *rcache, ucs_pgt_addr_t *start, ucs_rcache_region_trace(rcache, region, "merge 0x%lx..0x%lx "UCS_RCACHE_PROT_FMT" with", *start, *end, UCS_RCACHE_PROT_ARG(*prot)); - *start = ucs_min(*start, region->super.start); - *end = ucs_max(*end, region->super.end); + *start = ucs_min(*start, region->super.start); + *end = ucs_max(*end, region->super.end); + *merged = 1; ucs_rcache_region_invalidate(rcache, region, 1, 0); } return UCS_OK; @@ -384,12 +385,14 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length, ucs_rcache_region_t *region = NULL; ucs_pgt_addr_t start, end; ucs_status_t status; + int merged; ucs_trace_func("rcache=%s, address=%p, length=%zu", rcache->name, address, length); pthread_rwlock_wrlock(&rcache->lock); +retry: /* Align to page size */ start = ucs_align_down_pow2((uintptr_t)address, rcache->params.alignment); @@ -397,8 +400,9 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length, rcache->params.alignment); /* Check overlap with existing regions */ + merged = 0; status = UCS_PROFILE_CALL(ucs_rcache_check_overlap, rcache, &start, &end, - &prot, ®ion); + &prot, &merged, ®ion); if (status == UCS_ERR_ALREADY_EXISTS) { /* Found a matching region (it could have been added after we released * the lock) @@ -442,12 +446,19 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length, UCS_PROFILE_NAMED_CALL("mem_reg", rcache->params.ops->mem_reg, rcache->params.context, rcache, arg, region); if (status != UCS_OK) { - /* In case region is not registered, we don't return it to the user, - * so no need to increment reference count. - */ - ucs_rcache_region_debug(rcache, region, "created with status %s", - ucs_status_string(status)); - goto out_unlock; + if (merged) { + /* failure may be due to merge, retry with original address */ + ucs_debug("failed to register merged region " UCS_PGT_REGION_FMT ": %s, retrying", + UCS_PGT_REGION_ARG(®ion->super), ucs_status_string(status)); + status = ucs_pgtable_remove(&rcache->pgtable, ®ion->super); + ucs_assert_always(status == UCS_OK); + ucs_mem_region_destroy_internal(rcache, region); + goto retry; + } else { + ucs_warn("failed to register region " UCS_PGT_REGION_FMT ": %s", + UCS_PGT_REGION_ARG(®ion->super), ucs_status_string(status)); + goto out_unlock; + } } region->flags |= UCS_RCACHE_REGION_FLAG_REGISTERED; diff --git a/src/ucs/sys/rcache.h b/src/ucs/sys/rcache.h index f7dbf163361..c55463a5649 100644 --- a/src/ucs/sys/rcache.h +++ b/src/ucs/sys/rcache.h @@ -50,6 +50,12 @@ struct ucs_rcache_ops { * `region_struct_size' in @ref ucs_rcache_params. * This function may store relevant information (such * as memory keys) inside the larger structure. + * + * @return UCS_OK if registration is successful, error otherwise. + * + * @note This function should be able to handle inaccessible memory addresses + * and return error status in this case, without any destructive consequences + * such as error messages or fatal failure. */ ucs_status_t (*mem_reg)(void *context, ucs_rcache_t *rcache, void *arg, ucs_rcache_region_t *region); diff --git a/src/uct/ib/base/ib_md.c b/src/uct/ib/base/ib_md.c index fd7b6cd8ed6..7b55e3673e0 100644 --- a/src/uct/ib/base/ib_md.c +++ b/src/uct/ib/base/ib_md.c @@ -325,8 +325,9 @@ uint8_t uct_ib_md_get_atomic_mr_id(uct_ib_md_t *md) static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address, size_t length, uint64_t exp_access, - struct ibv_mr **mr_p) + int silent, struct ibv_mr **mr_p) { + ucs_log_level_t level = silent ? UCS_LOG_LEVEL_DEBUG : UCS_LOG_LEVEL_ERROR; struct ibv_mr *mr; if (exp_access) { @@ -341,8 +342,8 @@ static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address, mr = UCS_PROFILE_CALL(ibv_exp_reg_mr, &in); if (mr == NULL) { - ucs_error("ibv_exp_reg_mr(address=%p, length=%Zu, exp_access=0x%lx) failed: %m", - in.addr, in.length, in.exp_access); + ucs_log(level, "ibv_exp_reg_mr(address=%p, length=%Zu, exp_access=0x%lx) failed: %m", + in.addr, in.length, in.exp_access); return UCS_ERR_IO_ERROR; } #else @@ -352,7 +353,7 @@ static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address, mr = UCS_PROFILE_CALL(ibv_reg_mr, md->pd, address, length, UCT_IB_MEM_ACCESS_FLAGS); if (mr == NULL) { - ucs_error("ibv_reg_mr(address=%p, length=%Zu, access=0x%x) failed: %m", + ucs_log(level, "ibv_reg_mr(address=%p, length=%Zu, access=0x%x) failed: %m", address, length, UCT_IB_MEM_ACCESS_FLAGS); return UCS_ERR_IO_ERROR; } @@ -677,7 +678,7 @@ static ucs_status_t uct_ib_mem_alloc(uct_md_h uct_md, size_t *length_p, length = ucs_memtrack_adjust_alloc_size(*length_p); exp_access = uct_ib_md_access_flags(md, flags, length) | IBV_EXP_ACCESS_ALLOCATE_MR; - status = uct_ib_md_reg_mr(md, NULL, length, exp_access, &memh->mr); + status = uct_ib_md_reg_mr(md, NULL, length, exp_access, 0, &memh->mr); if (status != UCS_OK) { goto err_free_memh; } @@ -727,14 +728,14 @@ static ucs_status_t uct_ib_mem_free(uct_md_h md, uct_mem_h memh) static ucs_status_t uct_ib_mem_reg_internal(uct_md_h uct_md, void *address, size_t length, unsigned flags, - uct_ib_mem_t *memh) + int silent, uct_ib_mem_t *memh) { uct_ib_md_t *md = ucs_derived_of(uct_md, uct_ib_md_t); ucs_status_t status; uint64_t exp_access; exp_access = uct_ib_md_access_flags(md, flags, length); - status = uct_ib_md_reg_mr(md, address, length, exp_access, &memh->mr); + status = uct_ib_md_reg_mr(md, address, length, exp_access, silent, &memh->mr); if (status != UCS_OK) { return status; } @@ -764,7 +765,7 @@ static ucs_status_t uct_ib_mem_reg(uct_md_h uct_md, void *address, size_t length return UCS_ERR_NO_MEMORY; } - status = uct_ib_mem_reg_internal(uct_md, address, length, flags, memh); + status = uct_ib_mem_reg_internal(uct_md, address, length, flags, 0, memh); if (status != UCS_OK) { uct_ib_memh_free(memh); return status; @@ -916,7 +917,7 @@ static ucs_status_t uct_ib_rcache_mem_reg_cb(void *context, ucs_rcache_t *rcache status = uct_ib_mem_reg_internal(&md->super, (void*)region->super.super.start, region->super.super.end - region->super.super.start, - *flags, ®ion->memh); + *flags, 1, ®ion->memh); if (status != UCS_OK) { return status; } diff --git a/test/gtest/ucs/test_rcache.cc b/test/gtest/ucs/test_rcache.cc index 98ba99554fa..3484d70b3b6 100644 --- a/test/gtest/ucs/test_rcache.cc +++ b/test/gtest/ucs/test_rcache.cc @@ -460,17 +460,37 @@ class test_rcache_no_register : public test_rcache { virtual ucs_status_t mem_reg(region *region) { return UCS_ERR_IO_ERROR; } + + static ucs_log_func_rc_t + log_handler(const char *file, unsigned line, const char *function, + ucs_log_level_t level, const char *prefix, const char *message, + va_list ap) + { + /* Ignore warnings about empty memory pool */ + if ((level == UCS_LOG_LEVEL_WARN) && strstr(message, "failed to register")) { + char buf[ucs_global_opts.log_buffer_size]; + vsnprintf(buf, sizeof(buf), message, ap); + UCS_TEST_MESSAGE << "" << buf; + return UCS_LOG_FUNC_RC_STOP; + } + + return UCS_LOG_FUNC_RC_CONTINUE; + } }; UCS_MT_TEST_F(test_rcache_no_register, register_failure, 10) { static const size_t size = 1 * 1024 * 1024; void *ptr = malloc(size); + ucs_log_push_handler(log_handler); + ucs_status_t status; ucs_rcache_region_t *r; status = ucs_rcache_get(m_rcache, ptr, size, PROT_READ|PROT_WRITE, NULL, &r); EXPECT_EQ(UCS_ERR_IO_ERROR, status); EXPECT_EQ(0u, m_reg_count); + ucs_log_pop_handler(); + free(ptr); } diff --git a/test/gtest/uct/test_pd.cc b/test/gtest/uct/test_pd.cc index 98e90966297..78771d42021 100644 --- a/test/gtest/uct/test_pd.cc +++ b/test/gtest/uct/test_pd.cc @@ -32,6 +32,21 @@ class test_pd : public testing::TestWithParam, return m_pd; } + static void* alloc_thread(void *arg) + { + volatile int *stop_flag = (int*)arg; + + while (!*stop_flag) { + int count = rand() % 100; + std::vector buffers; + for (int i = 0; i < count; ++i) { + buffers.push_back(malloc(rand() % (256*1024))); + } + std::for_each(buffers.begin(), buffers.end(), free); + } + return NULL; + } + private: ucs::handle m_md_config; ucs::handle m_pd; @@ -258,6 +273,43 @@ UCS_TEST_P(test_pd, alloc_advise) { uct_md_mem_free(pd(), memh); } +/* + * reproduce issue #1284, main thread is registering memory while another thread + * allocates and releases memory. + */ +UCS_TEST_P(test_pd, reg_multi_thread) { + ucs_status_t status; + + check_caps(UCT_MD_FLAG_REG, "registration"); + + pthread_t thread_id; + int stop_flag = 0; + pthread_create(&thread_id, NULL, alloc_thread, &stop_flag); + + ucs_time_t start_time = ucs_get_time(); + while (ucs_get_time() - start_time < ucs_time_from_sec(0.5)) { + const size_t size = (rand() % 65536) + 1; + + void *buffer = malloc(size); + ASSERT_TRUE(buffer != NULL); + + uct_mem_h memh; + status = uct_md_mem_reg(pd(), buffer, size, UCT_MD_MEM_FLAG_NONBLOCK, &memh); + ASSERT_UCS_OK(status, << " buffer=" << buffer << " size=" << size); + ASSERT_TRUE(memh != UCT_MEM_HANDLE_NULL); + + sched_yield(); + + status = uct_md_mem_dereg(pd(), memh); + EXPECT_UCS_OK(status); + free(buffer); + } + + stop_flag = 1; + pthread_join(thread_id, NULL); +} + + #define UCT_PD_INSTANTIATE_TEST_CASE(_test_case) \ UCS_PP_FOREACH(_UCT_PD_INSTANTIATE_TEST_CASE, _test_case, \ knem, \