Skip to content

Commit

Permalink
Merge pull request #5529 from yosefe/topic/uct-rc-verbs-use-non-empty…
Browse files Browse the repository at this point in the history
…-rdma-v1.9.x

UCT/RC/VERBS: Use non-empty RDMA_WRITE for endpoint flush - v1.9.x
  • Loading branch information
yosefe authored Aug 6, 2020
2 parents 6e1bafc + c946df1 commit ffc8360
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 30 deletions.
9 changes: 0 additions & 9 deletions src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,6 @@ struct uct_rc_ep {
UCS_CLASS_DECLARE(uct_rc_ep_t, uct_rc_iface_t*, uint32_t, const uct_ep_params_t*);


typedef struct uct_rc_ep_address {
uint8_t flags;
uct_ib_uint24_t qp_num;
} UCS_S_PACKED uct_rc_ep_address_t;

enum {
UCT_RC_ADDR_HAS_ATOMIC_MR = UCS_BIT(0)
};

void uct_rc_ep_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
void *data, size_t length, size_t valid_length,
char *buffer, size_t max);
Expand Down
25 changes: 24 additions & 1 deletion src/uct/ib/rc/verbs/rc_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <uct/ib/rc/base/rc_ep.h>
#include <ucs/type/class.h>


#define UCT_RC_VERBS_IFACE_FOREACH_TXWQE(_iface, _i, _wc, _num_wcs) \
status = uct_ib_poll_cq((_iface)->super.cq[UCT_IB_DIR_TX], &_num_wcs, _wc); \
if (status != UCS_OK) { \
Expand All @@ -21,11 +22,25 @@
for (_i = 0; _i < _num_wcs; ++_i)


enum {
UCT_RC_VERBS_ADDR_HAS_ATOMIC_MR = UCS_BIT(0)
};


typedef struct uct_rc_verbs_ep_address {
uint8_t flags;
uct_ib_uint24_t qp_num;
uint64_t flush_addr;
uint32_t flush_rkey;
} UCS_S_PACKED uct_rc_verbs_ep_address_t;


typedef struct uct_rc_verbs_txcnt {
uint16_t pi; /* producer (post_send) count */
uint16_t ci; /* consumer (ibv_poll_cq) completion count */
} uct_rc_verbs_txcnt_t;


/**
* RC verbs communication context.
*/
Expand All @@ -34,6 +49,10 @@ typedef struct uct_rc_verbs_ep {
uct_rc_verbs_txcnt_t txcnt;
uct_ib_fence_info_t fi;
struct ibv_qp *qp;
struct {
uintptr_t remote_addr;
uint32_t rkey;
} flush;
} uct_rc_verbs_ep_t;


Expand All @@ -58,7 +77,9 @@ typedef struct uct_rc_verbs_iface {
struct ibv_sge inl_sge[2];
uct_rc_am_short_hdr_t am_inl_hdr;
ucs_mpool_t short_desc_mp;
uct_rc_iface_send_desc_t *fc_desc; /* used when max_inline is zero */
uct_rc_iface_send_desc_t *fc_desc; /* used when max_inline is zero */
struct ibv_mr *flush_mr; /* MR for writing dummy value to flush */
void *flush_mem;
struct {
size_t short_desc_size;
size_t max_inline;
Expand All @@ -68,6 +89,8 @@ typedef struct uct_rc_verbs_iface {
} uct_rc_verbs_iface_t;


ucs_status_t uct_rc_verbs_iface_flush_mem_create(uct_rc_verbs_iface_t *iface);

UCS_CLASS_DECLARE(uct_rc_verbs_ep_t, const uct_ep_params_t *);
UCS_CLASS_DECLARE_NEW_FUNC(uct_rc_verbs_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DECLARE_DELETE_FUNC(uct_rc_verbs_ep_t, uct_ep_t);
Expand Down
75 changes: 60 additions & 15 deletions src/uct/ib/rc/verbs/rc_verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,36 @@ ucs_status_t uct_rc_verbs_ep_atomic_cswap64(uct_ep_h tl_ep, uint64_t compare, ui
remote_addr, rkey, comp);
}

static ucs_status_t uct_rc_verbs_ep_post_flush(uct_rc_verbs_ep_t *ep)
{
uct_rc_verbs_iface_t *iface = ucs_derived_of(ep->super.super.super.iface,
uct_rc_verbs_iface_t);
struct ibv_send_wr wr;
struct ibv_sge sge;
int inl_flag;

UCT_RC_CHECK_RES(&iface->super, &ep->super);

/*
* Send small RDMA_WRITE as a flush operation
* (some adapters do not support 0-size RDMA_WRITE or inline sends)
*/
sge.addr = (uintptr_t)iface->flush_mem;
sge.length = 1;
sge.lkey = iface->flush_mr->lkey;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE;
wr.wr.rdma.remote_addr = ep->flush.remote_addr;
wr.wr.rdma.rkey = ep->flush.rkey;
inl_flag = (iface->config.max_inline >= sge.length) ?
IBV_SEND_INLINE : 0;

uct_rc_verbs_ep_post_send(iface, ep, &wr, inl_flag | IBV_SEND_SIGNALED, 1);
return UCS_OK;
}

ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp)
{
Expand All @@ -403,7 +433,7 @@ ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags,
}

if (uct_rc_txqp_unsignaled(&ep->super.txqp) != 0) {
status = uct_rc_verbs_ep_put_short(tl_ep, NULL, 0, 0, 0);
status = uct_rc_verbs_ep_post_flush(ep);
if (status != UCS_OK) {
return status;
}
Expand Down Expand Up @@ -480,18 +510,27 @@ ucs_status_t uct_rc_verbs_ep_handle_failure(uct_rc_verbs_ep_t *ep,

ucs_status_t uct_rc_verbs_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
{
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t);
uct_rc_ep_address_t *rc_addr = (uct_rc_ep_address_t*)addr;
uct_ib_md_t *md = uct_ib_iface_md(ucs_derived_of(
tl_ep->iface, uct_ib_iface_t));
void *ptr = rc_addr + 1;
uct_rc_verbs_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_rc_verbs_iface_t);
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t);
uct_ib_md_t *md = uct_ib_iface_md(&iface->super.super);
uct_rc_verbs_ep_address_t *rc_addr = (uct_rc_verbs_ep_address_t*)addr;
ucs_status_t status;
uint8_t mr_id;

status = uct_rc_verbs_iface_flush_mem_create(iface);
if (status != UCS_OK) {
return status;
}

rc_addr->flags = 0;
rc_addr->flush_addr = (uintptr_t)iface->flush_mem;
rc_addr->flush_rkey = iface->flush_mr->rkey;
uct_ib_pack_uint24(rc_addr->qp_num, ep->qp->qp_num);

if (md->ops->get_atomic_mr_id(md, &mr_id) == UCS_OK) {
rc_addr->flags |= UCT_RC_ADDR_HAS_ATOMIC_MR;
*(uint8_t*)ptr = mr_id;
rc_addr->flags |= UCT_RC_VERBS_ADDR_HAS_ATOMIC_MR;
*(uint8_t*)(rc_addr + 1) = mr_id;
}
return UCS_OK;
}
Expand All @@ -500,11 +539,13 @@ ucs_status_t uct_rc_verbs_ep_connect_to_ep(uct_ep_h tl_ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr)
{
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t);
uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t *)dev_addr;
const uct_rc_ep_address_t *rc_addr = (const uct_rc_ep_address_t*)ep_addr;
const void *ptr = rc_addr + 1;
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep,
uct_rc_verbs_ep_t);
uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_rc_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t *)dev_addr;
const uct_rc_verbs_ep_address_t *rc_addr =
(const uct_rc_verbs_ep_address_t*)ep_addr;
ucs_status_t status;
uint32_t qp_num;
struct ibv_ah_attr ah_attr;
Expand All @@ -514,14 +555,18 @@ ucs_status_t uct_rc_verbs_ep_connect_to_ep(uct_ep_h tl_ep,
ep->super.path_index, &ah_attr,
&path_mtu);
ucs_assert(path_mtu != UCT_IB_ADDRESS_INVALID_PATH_MTU);

qp_num = uct_ib_unpack_uint24(rc_addr->qp_num);
status = uct_rc_iface_qp_connect(iface, ep->qp, qp_num, &ah_attr, path_mtu);
if (status != UCS_OK) {
return status;
}

if (rc_addr->flags & UCT_RC_ADDR_HAS_ATOMIC_MR) {
ep->super.atomic_mr_offset = *(uint8_t*)ptr;
ep->flush.remote_addr = rc_addr->flush_addr;
ep->flush.rkey = rc_addr->flush_rkey;

if (rc_addr->flags & UCT_RC_VERBS_ADDR_HAS_ATOMIC_MR) {
ep->super.atomic_mr_offset = *(uint8_t*)(rc_addr + 1);
} else {
ep->super.atomic_mr_offset = 0;
}
Expand Down
57 changes: 53 additions & 4 deletions src/uct/ib/rc/verbs/rc_verbs_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,55 @@ static ucs_status_t uct_rc_verbs_iface_query(uct_iface_h tl_iface, uct_iface_att
iface_attr->latency.m += 1e-9; /* 1 ns per each extra QP */
iface_attr->overhead = 75e-9; /* Software overhead */

iface_attr->ep_addr_len = sizeof(uct_rc_ep_address_t);
iface_attr->ep_addr_len = sizeof(uct_rc_verbs_ep_address_t);
if (md->ops->get_atomic_mr_id(md, &mr_id) == UCS_OK) {
iface_attr->ep_addr_len += sizeof(mr_id);
}

return UCS_OK;
}

ucs_status_t uct_rc_verbs_iface_flush_mem_create(uct_rc_verbs_iface_t *iface)
{
uct_ib_md_t *md = uct_ib_iface_md(&iface->super.super);
ucs_status_t status;
struct ibv_mr *mr;
void *mem;

if (iface->flush_mr != NULL) {
ucs_assert(iface->flush_mem != NULL);
return UCS_OK;
}

/*
* Map a whole page for the remote side to issue a dummy RDMA_WRITE on it,
* to flush its outstanding operations. A whole page is used to prevent any
* other allocations from using same page, so it would be fork-safe.
*/
mem = ucs_mmap(NULL, ucs_get_page_size(), PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0, "flush_mem");
if (mem == MAP_FAILED) {
ucs_error("failed to allocate page for remote flush: %m");
status = UCS_ERR_NO_MEMORY;
goto err;
}

status = uct_ib_reg_mr(md->pd, mem, ucs_get_page_size(),
UCT_IB_MEM_ACCESS_FLAGS, &mr);
if (status != UCS_OK) {
goto err_munmap;
}

iface->flush_mem = mem;
iface->flush_mr = mr;
return UCS_OK;

err_munmap:
ucs_munmap(mem, ucs_get_page_size());
err:
return status;
}

static ucs_status_t
uct_rc_iface_verbs_init_rx(uct_rc_iface_t *rc_iface,
const uct_rc_iface_common_config_t *config)
Expand All @@ -199,8 +240,8 @@ void uct_rc_iface_verbs_cleanup_rx(uct_rc_iface_t *rc_iface)
uct_ib_destroy_srq(iface->srq);
}

static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t, uct_md_h md, uct_worker_h worker,
const uct_iface_params_t *params,
static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t, uct_md_h tl_md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_iface_config_t *tl_config)
{
uct_rc_verbs_iface_config_t *config =
Expand All @@ -218,7 +259,7 @@ static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t, uct_md_h md, uct_worker_h worke
init_attr.cq_len[UCT_IB_DIR_TX] = config->super.tx_cq_len;
init_attr.seg_size = config->super.super.super.seg_size;

UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t, &uct_rc_verbs_iface_ops, md,
UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t, &uct_rc_verbs_iface_ops, tl_md,
worker, params, &config->super.super, &init_attr);

self->config.tx_max_wr = ucs_min(config->tx_max_wr,
Expand All @@ -227,6 +268,8 @@ static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t, uct_md_h md, uct_worker_h worke
self->config.tx_max_wr / 4);
self->super.config.fence_mode = (uct_rc_fence_mode_t)config->super.super.fence_mode;
self->super.progress = uct_rc_verbs_iface_progress;
self->flush_mem = NULL;
self->flush_mr = NULL;

if ((config->super.super.fence_mode == UCT_RC_FENCE_MODE_WEAK) ||
(config->super.super.fence_mode == UCT_RC_FENCE_MODE_AUTO)) {
Expand Down Expand Up @@ -365,6 +408,12 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_verbs_iface_t)
{
uct_base_iface_progress_disable(&self->super.super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

if (self->flush_mr != NULL) {
uct_ib_dereg_mr(self->flush_mr);
ucs_assert(self->flush_mem != NULL);
ucs_munmap(self->flush_mem, ucs_get_page_size());
}
if (self->fc_desc != NULL) {
ucs_mpool_put(self->fc_desc);
}
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/common/test_obj_size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ UCS_TEST_F(test_obj_size, size) {
EXPECTED_SIZE(uct_tcp_ep_t, 160);
# if HAVE_TL_RC
EXPECTED_SIZE(uct_rc_ep_t, 64);
EXPECTED_SIZE(uct_rc_verbs_ep_t, 80);
EXPECTED_SIZE(uct_rc_verbs_ep_t, 96);
# endif
# if HAVE_TL_DC
EXPECTED_SIZE(uct_dc_mlx5_ep_t, 32);
Expand Down

0 comments on commit ffc8360

Please sign in to comment.