Skip to content

Commit

Permalink
UCS/RCACHE: Fix rcache merge while mappings are modified.
Browse files Browse the repository at this point in the history
When merging with existing memory regions, it may already be released by
another thread, or even triggered by releasing some on rcache internal
structures. It's not always possible to detect this release on time
before attempting to register the region. Therefore, let the memory
registration callback report the error, and retry the registration with
the original address range (which has to be valid).
  • Loading branch information
yosefe committed May 7, 2017
1 parent 4ffe1f2 commit 778d47b
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 19 deletions.
31 changes: 21 additions & 10 deletions src/ucs/sys/rcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ static inline int ucs_rcache_region_test(ucs_rcache_region_t *region, int prot)
/* Lock must be held */
static ucs_status_t
ucs_rcache_check_overlap(ucs_rcache_t *rcache, ucs_pgt_addr_t *start,
ucs_pgt_addr_t *end, int *prot,
ucs_pgt_addr_t *end, int *prot, int *merged,
ucs_rcache_region_t **region_p)
{
ucs_rcache_region_t *region, *tmp;
Expand Down Expand Up @@ -370,8 +370,9 @@ ucs_rcache_check_overlap(ucs_rcache_t *rcache, ucs_pgt_addr_t *start,
ucs_rcache_region_trace(rcache, region,
"merge 0x%lx..0x%lx "UCS_RCACHE_PROT_FMT" with",
*start, *end, UCS_RCACHE_PROT_ARG(*prot));
*start = ucs_min(*start, region->super.start);
*end = ucs_max(*end, region->super.end);
*start = ucs_min(*start, region->super.start);
*end = ucs_max(*end, region->super.end);
*merged = 1;
ucs_rcache_region_invalidate(rcache, region, 1, 0);
}
return UCS_OK;
Expand All @@ -384,21 +385,24 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length,
ucs_rcache_region_t *region = NULL;
ucs_pgt_addr_t start, end;
ucs_status_t status;
int merged;

ucs_trace_func("rcache=%s, address=%p, length=%zu", rcache->name, address,
length);

pthread_rwlock_wrlock(&rcache->lock);

retry:
/* Align to page size */
start = ucs_align_down_pow2((uintptr_t)address,
rcache->params.alignment);
end = ucs_align_up_pow2 ((uintptr_t)address + length,
rcache->params.alignment);

/* Check overlap with existing regions */
merged = 0;
status = UCS_PROFILE_CALL(ucs_rcache_check_overlap, rcache, &start, &end,
&prot, &region);
&prot, &merged, &region);
if (status == UCS_ERR_ALREADY_EXISTS) {
/* Found a matching region (it could have been added after we released
* the lock)
Expand Down Expand Up @@ -442,12 +446,19 @@ ucs_rcache_create_region(ucs_rcache_t *rcache, void *address, size_t length,
UCS_PROFILE_NAMED_CALL("mem_reg", rcache->params.ops->mem_reg,
rcache->params.context, rcache, arg, region);
if (status != UCS_OK) {
/* In case region is not registered, we don't return it to the user,
* so no need to increment reference count.
*/
ucs_rcache_region_debug(rcache, region, "created with status %s",
ucs_status_string(status));
goto out_unlock;
if (merged) {
/* failure may be due to merge, retry with original address */
ucs_debug("failed to register merged region " UCS_PGT_REGION_FMT ": %s, retrying",
UCS_PGT_REGION_ARG(&region->super), ucs_status_string(status));
status = ucs_pgtable_remove(&rcache->pgtable, &region->super);
ucs_assert_always(status == UCS_OK);
ucs_mem_region_destroy_internal(rcache, region);
goto retry;
} else {
ucs_warn("failed to register region " UCS_PGT_REGION_FMT ": %s",
UCS_PGT_REGION_ARG(&region->super), ucs_status_string(status));
goto out_unlock;
}
}

region->flags |= UCS_RCACHE_REGION_FLAG_REGISTERED;
Expand Down
6 changes: 6 additions & 0 deletions src/ucs/sys/rcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ struct ucs_rcache_ops {
* `region_struct_size' in @ref ucs_rcache_params.
* This function may store relevant information (such
* as memory keys) inside the larger structure.
*
* @return UCS_OK if registration is successful, error otherwise.
*
* @note This function should be able to handle inaccessible memory addresses
* and return error status in this case, without any destructive consequences
* such as error messages or fatal failure.
*/
ucs_status_t (*mem_reg)(void *context, ucs_rcache_t *rcache,
void *arg, ucs_rcache_region_t *region);
Expand Down
19 changes: 10 additions & 9 deletions src/uct/ib/base/ib_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,9 @@ uint8_t uct_ib_md_get_atomic_mr_id(uct_ib_md_t *md)

static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address,
size_t length, uint64_t exp_access,
struct ibv_mr **mr_p)
int silent, struct ibv_mr **mr_p)
{
ucs_log_level_t level = silent ? UCS_LOG_LEVEL_DEBUG : UCS_LOG_LEVEL_ERROR;
struct ibv_mr *mr;

if (exp_access) {
Expand All @@ -341,8 +342,8 @@ static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address,

mr = UCS_PROFILE_CALL(ibv_exp_reg_mr, &in);
if (mr == NULL) {
ucs_error("ibv_exp_reg_mr(address=%p, length=%Zu, exp_access=0x%lx) failed: %m",
in.addr, in.length, in.exp_access);
ucs_log(level, "ibv_exp_reg_mr(address=%p, length=%Zu, exp_access=0x%lx) failed: %m",
in.addr, in.length, in.exp_access);
return UCS_ERR_IO_ERROR;
}
#else
Expand All @@ -352,7 +353,7 @@ static ucs_status_t uct_ib_md_reg_mr(uct_ib_md_t *md, void *address,
mr = UCS_PROFILE_CALL(ibv_reg_mr, md->pd, address, length,
UCT_IB_MEM_ACCESS_FLAGS);
if (mr == NULL) {
ucs_error("ibv_reg_mr(address=%p, length=%Zu, access=0x%x) failed: %m",
ucs_log(level, "ibv_reg_mr(address=%p, length=%Zu, access=0x%x) failed: %m",
address, length, UCT_IB_MEM_ACCESS_FLAGS);
return UCS_ERR_IO_ERROR;
}
Expand Down Expand Up @@ -677,7 +678,7 @@ static ucs_status_t uct_ib_mem_alloc(uct_md_h uct_md, size_t *length_p,
length = ucs_memtrack_adjust_alloc_size(*length_p);
exp_access = uct_ib_md_access_flags(md, flags, length) |
IBV_EXP_ACCESS_ALLOCATE_MR;
status = uct_ib_md_reg_mr(md, NULL, length, exp_access, &memh->mr);
status = uct_ib_md_reg_mr(md, NULL, length, exp_access, 0, &memh->mr);
if (status != UCS_OK) {
goto err_free_memh;
}
Expand Down Expand Up @@ -727,14 +728,14 @@ static ucs_status_t uct_ib_mem_free(uct_md_h md, uct_mem_h memh)

static ucs_status_t uct_ib_mem_reg_internal(uct_md_h uct_md, void *address,
size_t length, unsigned flags,
uct_ib_mem_t *memh)
int silent, uct_ib_mem_t *memh)
{
uct_ib_md_t *md = ucs_derived_of(uct_md, uct_ib_md_t);
ucs_status_t status;
uint64_t exp_access;

exp_access = uct_ib_md_access_flags(md, flags, length);
status = uct_ib_md_reg_mr(md, address, length, exp_access, &memh->mr);
status = uct_ib_md_reg_mr(md, address, length, exp_access, silent, &memh->mr);
if (status != UCS_OK) {
return status;
}
Expand Down Expand Up @@ -764,7 +765,7 @@ static ucs_status_t uct_ib_mem_reg(uct_md_h uct_md, void *address, size_t length
return UCS_ERR_NO_MEMORY;
}

status = uct_ib_mem_reg_internal(uct_md, address, length, flags, memh);
status = uct_ib_mem_reg_internal(uct_md, address, length, flags, 0, memh);
if (status != UCS_OK) {
uct_ib_memh_free(memh);
return status;
Expand Down Expand Up @@ -916,7 +917,7 @@ static ucs_status_t uct_ib_rcache_mem_reg_cb(void *context, ucs_rcache_t *rcache

status = uct_ib_mem_reg_internal(&md->super, (void*)region->super.super.start,
region->super.super.end - region->super.super.start,
*flags, &region->memh);
*flags, 1, &region->memh);
if (status != UCS_OK) {
return status;
}
Expand Down
20 changes: 20 additions & 0 deletions test/gtest/ucs/test_rcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,17 +460,37 @@ class test_rcache_no_register : public test_rcache {
virtual ucs_status_t mem_reg(region *region) {
return UCS_ERR_IO_ERROR;
}

static ucs_log_func_rc_t
log_handler(const char *file, unsigned line, const char *function,
ucs_log_level_t level, const char *prefix, const char *message,
va_list ap)
{
/* Ignore warnings about empty memory pool */
if ((level == UCS_LOG_LEVEL_WARN) && strstr(message, "failed to register")) {
char buf[ucs_global_opts.log_buffer_size];
vsnprintf(buf, sizeof(buf), message, ap);
UCS_TEST_MESSAGE << "" << buf;
return UCS_LOG_FUNC_RC_STOP;
}

return UCS_LOG_FUNC_RC_CONTINUE;
}
};

UCS_MT_TEST_F(test_rcache_no_register, register_failure, 10) {
static const size_t size = 1 * 1024 * 1024;
void *ptr = malloc(size);

ucs_log_push_handler(log_handler);

ucs_status_t status;
ucs_rcache_region_t *r;
status = ucs_rcache_get(m_rcache, ptr, size, PROT_READ|PROT_WRITE, NULL, &r);
EXPECT_EQ(UCS_ERR_IO_ERROR, status);
EXPECT_EQ(0u, m_reg_count);

ucs_log_pop_handler();

free(ptr);
}
52 changes: 52 additions & 0 deletions test/gtest/uct/test_pd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ class test_pd : public testing::TestWithParam<std::string>,
return m_pd;
}

static void* alloc_thread(void *arg)
{
volatile int *stop_flag = (int*)arg;

while (!*stop_flag) {
int count = rand() % 100;
std::vector<void*> buffers;
for (int i = 0; i < count; ++i) {
buffers.push_back(malloc(rand() % (256*1024)));
}
std::for_each(buffers.begin(), buffers.end(), free);
}
return NULL;
}

private:
ucs::handle<uct_md_config_t*> m_md_config;
ucs::handle<uct_md_h> m_pd;
Expand Down Expand Up @@ -258,6 +273,43 @@ UCS_TEST_P(test_pd, alloc_advise) {
uct_md_mem_free(pd(), memh);
}

/*
* reproduce issue #1284, main thread is registering memory while another thread
* allocates and releases memory.
*/
UCS_TEST_P(test_pd, reg_multi_thread) {
ucs_status_t status;

check_caps(UCT_MD_FLAG_REG, "registration");

pthread_t thread_id;
int stop_flag = 0;
pthread_create(&thread_id, NULL, alloc_thread, &stop_flag);

ucs_time_t start_time = ucs_get_time();
while (ucs_get_time() - start_time < ucs_time_from_sec(0.5)) {
const size_t size = (rand() % 65536) + 1;

void *buffer = malloc(size);
ASSERT_TRUE(buffer != NULL);

uct_mem_h memh;
status = uct_md_mem_reg(pd(), buffer, size, UCT_MD_MEM_FLAG_NONBLOCK, &memh);
ASSERT_UCS_OK(status, << " buffer=" << buffer << " size=" << size);
ASSERT_TRUE(memh != UCT_MEM_HANDLE_NULL);

sched_yield();

status = uct_md_mem_dereg(pd(), memh);
EXPECT_UCS_OK(status);
free(buffer);
}

stop_flag = 1;
pthread_join(thread_id, NULL);
}


#define UCT_PD_INSTANTIATE_TEST_CASE(_test_case) \
UCS_PP_FOREACH(_UCT_PD_INSTANTIATE_TEST_CASE, _test_case, \
knem, \
Expand Down

0 comments on commit 778d47b

Please sign in to comment.