Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v1.22.x] util/av: Backport a few changes #10432

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions include/ofi_mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ struct ofi_bufpool_hdr {
struct ofi_bufpool_region *region;
size_t index;

OFI_DBG_VAR(bool, allocated)
OFI_DBG_VAR(struct ofi_bufpool_ftr *, ftr)
OFI_DBG_VAR(size_t, magic)
};
Expand Down Expand Up @@ -433,15 +432,32 @@ static inline struct ofi_bufpool *ofi_buf_pool(void *buf)
return ofi_buf_region(buf)->pool;
}

static inline bool ofi_buf_is_valid(void *buf)
{
struct ofi_bufpool_hdr *buf_hdr = ofi_buf_hdr(buf);
return buf_hdr->entry.slist.next == &buf_hdr->entry.slist;
}

static inline bool ofi_bufpool_ibuf_is_valid(struct ofi_bufpool *pool, size_t index)
{
void *buf;
size_t region_index = index / pool->attr.chunk_cnt;

assert(region_index < pool->region_cnt);

buf = pool->region_table[region_index]->mem_region +
(index % pool->attr.chunk_cnt) * pool->entry_size;

return ofi_buf_is_valid(buf);
}

static inline void ofi_buf_free(void *buf)
{
assert(ofi_atomic_dec32(&ofi_buf_region(buf)->use_cnt) >= 0);
assert(!(ofi_buf_pool(buf)->attr.flags & OFI_BUFPOOL_INDEXED));
assert(ofi_buf_hdr(buf)->magic == OFI_MAGIC_SIZE_T);
assert(ofi_buf_hdr(buf)->ftr->magic == OFI_MAGIC_SIZE_T);
assert(ofi_buf_hdr(buf)->allocated == true);

OFI_DBG_SET(ofi_buf_hdr(buf)->allocated, false);
assert(ofi_buf_is_valid(buf));

slist_insert_head(&ofi_buf_hdr(buf)->entry.slist,
&ofi_buf_pool(buf)->free_list.entries);
Expand All @@ -460,9 +476,7 @@ static inline void ofi_ibuf_free(void *buf)
assert(ofi_buf_pool(buf)->attr.flags & OFI_BUFPOOL_INDEXED);
assert(buf_hdr->magic == OFI_MAGIC_SIZE_T);
assert(buf_hdr->ftr->magic == OFI_MAGIC_SIZE_T);
assert(buf_hdr->allocated == true);

OFI_DBG_SET(buf_hdr->allocated, false);
assert(ofi_buf_is_valid(buf));

dlist_insert_order(&buf_hdr->region->free_list,
ofi_ibuf_is_lower, &buf_hdr->entry.dlist);
Expand All @@ -487,8 +501,7 @@ static inline void *ofi_bufpool_get_ibuf(struct ofi_bufpool *pool, size_t index)

buf = pool->region_table[region_index]->mem_region +
(index % pool->attr.chunk_cnt) * pool->entry_size;

assert(ofi_buf_hdr(buf)->allocated);
assert(ofi_buf_is_valid(buf));

return buf;
}
Expand Down Expand Up @@ -516,9 +529,9 @@ static inline void *ofi_buf_alloc(struct ofi_bufpool *pool)
slist_remove_head_container(&pool->free_list.entries,
struct ofi_bufpool_hdr, buf_hdr, entry.slist);
assert(ofi_atomic_inc32(&buf_hdr->region->use_cnt));
assert(buf_hdr->allocated == false);
assert(!ofi_buf_is_valid(ofi_buf_data(buf_hdr)));

OFI_DBG_SET(buf_hdr->allocated, true);
buf_hdr->entry.slist.next = &buf_hdr->entry.slist;

return ofi_buf_data(buf_hdr);
}
Expand Down Expand Up @@ -552,15 +565,42 @@ static inline void *ofi_ibuf_alloc(struct ofi_bufpool *pool)
dlist_pop_front(&buf_region->free_list, struct ofi_bufpool_hdr,
buf_hdr, entry.dlist);
assert(ofi_atomic_inc32(&buf_hdr->region->use_cnt));
assert(buf_hdr->allocated == false);
assert(!ofi_buf_is_valid(ofi_buf_data(buf_hdr)));

OFI_DBG_SET(buf_hdr->allocated, true);
buf_hdr->entry.dlist.next = &buf_hdr->entry.dlist;

if (dlist_empty(&buf_region->free_list))
dlist_remove_init(&buf_region->entry);
return ofi_buf_data(buf_hdr);
}

static inline void *ofi_ibuf_alloc_at(struct ofi_bufpool *pool, size_t index)
{
void *buf;
struct ofi_bufpool_hdr *buf_hdr;
struct ofi_bufpool_region *buf_region;
size_t region_index = index / pool->attr.chunk_cnt;

assert(pool->attr.flags & OFI_BUFPOOL_INDEXED);
while (region_index >= pool->region_cnt) {
if (ofi_bufpool_grow(pool))
return NULL;
}
buf_region = pool->region_table[region_index];
buf = buf_region->mem_region +
(index % pool->attr.chunk_cnt) * pool->entry_size;
buf_hdr = ofi_buf_hdr(buf);
assert(ofi_atomic_inc32(&buf_hdr->region->use_cnt));
assert(!ofi_buf_is_valid(buf));

dlist_remove(&buf_hdr->entry.dlist);
buf_hdr->entry.dlist.next = &buf_hdr->entry.dlist;

if (dlist_empty(&buf_region->free_list))
dlist_remove_init(&buf_region->entry);

return buf;
}

/*
* Persistent memory support
Expand Down
4 changes: 2 additions & 2 deletions include/ofi_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -903,14 +903,14 @@ uint32_t ofi_addr_format(const char *str);
int ofi_str_toaddr(const char *str, uint32_t *addr_format,
void **addr, size_t *len);

void ofi_straddr_log_internal(const char *func, int line,
void ofi_straddr_log_internal(const char *func, int line, uint32_t addr_format,
const struct fi_provider *prov,
enum fi_log_level level,
enum fi_log_subsys subsys, char *log_str,
const void *addr);

#define ofi_straddr_log(...) \
ofi_straddr_log_internal(__func__, __LINE__, __VA_ARGS__)
ofi_straddr_log_internal(__func__, __LINE__, FI_FORMAT_UNSPEC, __VA_ARGS__)

#if ENABLE_DEBUG
#define ofi_straddr_dbg(prov, subsystem, ...) \
Expand Down
5 changes: 5 additions & 0 deletions include/ofi_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,10 @@ static inline void ofi_ep_peer_rx_cntr_incerr(struct util_ep *ep, uint8_t op)
* AV / addressing
*/

#define ofi_av_straddr_log(av, level, ...) \
ofi_straddr_log_internal(__func__, __LINE__, av->domain->addr_format, \
av->prov, level, FI_LOG_AV, __VA_ARGS__)

struct util_av;
struct util_av_set;
struct util_peer_addr;
Expand Down Expand Up @@ -976,6 +980,7 @@ int ofi_av_close(struct util_av *av);
int ofi_av_close_lightweight(struct util_av *av);

size_t ofi_av_size(struct util_av *av);
int ofi_av_insert_addr_at(struct util_av *av, const void *addr, fi_addr_t fi_addr);
int ofi_av_insert_addr(struct util_av *av, const void *addr, fi_addr_t *fi_addr);
int ofi_av_remove_addr(struct util_av *av, fi_addr_t fi_addr);
fi_addr_t ofi_av_lookup_fi_addr_unsafe(struct util_av *av, const void *addr);
Expand Down
32 changes: 29 additions & 3 deletions prov/util/src/util_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,45 @@ int ofi_verify_av_insert(struct util_av *av, uint64_t flags, void *context)
return 0;
}

int ofi_av_insert_addr_at(struct util_av *av, const void *addr, fi_addr_t fi_addr)
{
struct util_av_entry *entry = NULL;

assert(ofi_mutex_held(&av->lock));
ofi_av_straddr_log(av, FI_LOG_INFO, "inserting addr", addr);
HASH_FIND(hh, av->hash, addr, av->addrlen, entry);
if (entry) {
if (fi_addr == ofi_buf_index(entry))
return FI_SUCCESS;

ofi_av_straddr_log(av, FI_LOG_WARN, "addr already in AV", addr);
return -FI_EALREADY;
}

entry = ofi_ibuf_alloc_at(av->av_entry_pool, fi_addr);
if (!entry)
return -FI_ENOMEM;

memcpy(entry->data, addr, av->addrlen);
ofi_atomic_initialize32(&entry->use_cnt, 1);
HASH_ADD(hh, av->hash, data, av->addrlen, entry);
FI_INFO(av->prov, FI_LOG_AV, "fi_addr: %" PRIu64 "\n",
ofi_buf_index(entry));
return 0;
}

int ofi_av_insert_addr(struct util_av *av, const void *addr, fi_addr_t *fi_addr)
{
struct util_av_entry *entry = NULL;

assert(ofi_mutex_held(&av->lock));
ofi_straddr_log(av->prov, FI_LOG_INFO, FI_LOG_AV, "inserting addr", addr);
ofi_av_straddr_log(av, FI_LOG_INFO, "inserting addr", addr);
HASH_FIND(hh, av->hash, addr, av->addrlen, entry);
if (entry) {
if (fi_addr)
*fi_addr = ofi_buf_index(entry);
if (ofi_atomic_inc32(&entry->use_cnt) > 1) {
ofi_straddr_log(av->prov, FI_LOG_WARN, FI_LOG_AV,
"addr already in AV", addr);
ofi_av_straddr_log(av, FI_LOG_WARN, "addr already in AV", addr);
}
} else {
entry = ofi_ibuf_alloc(av->av_entry_pool);
Expand Down
1 change: 0 additions & 1 deletion prov/util/src/util_buf.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ int ofi_bufpool_grow(struct ofi_bufpool *pool)
buf_hdr = ofi_buf_hdr(buf);
buf_hdr->region = buf_region;
buf_hdr->index = pool->entry_cnt + i;
OFI_DBG_SET(buf_hdr->allocated, false);
OFI_DBG_SET(buf_hdr->magic, OFI_MAGIC_SIZE_T);
OFI_DBG_SET(buf_hdr->ftr,
(struct ofi_bufpool_ftr *) ((char *) buf +
Expand Down
6 changes: 3 additions & 3 deletions src/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1053,19 +1053,19 @@ size_t ofi_mask_addr(struct sockaddr *maskaddr, const struct sockaddr *srcaddr,
return len;
}

void ofi_straddr_log_internal(const char *func, int line,
void ofi_straddr_log_internal(const char *func, int line, uint32_t addr_format,
const struct fi_provider *prov,
enum fi_log_level level,
enum fi_log_subsys subsys, char *log_str,
const void *addr)
{
char buf[OFI_ADDRSTRLEN];
uint32_t addr_format;
size_t len = sizeof(buf);

if (fi_log_enabled(prov, level, subsys)) {
if (addr) {
addr_format = ofi_translate_addr_format(ofi_sa_family(addr));
if (addr_format == FI_FORMAT_UNSPEC)
addr_format = ofi_translate_addr_format(ofi_sa_family(addr));
fi_log(prov, level, subsys, func, line, "%s: %s\n", log_str,
ofi_straddr(buf, &len, addr_format, addr));
} else {
Expand Down
Loading