Skip to content

Commit

Permalink
UCT/TAG: Do not post the same buffer more than once
Browse files Browse the repository at this point in the history
  • Loading branch information
brminich committed Dec 2, 2019
1 parent 721ab58 commit 65015fb
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 48 deletions.
5 changes: 3 additions & 2 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ ucp_tag_offload_do_post(ucp_request_t *req)
req->recv.tag.tag_mask, &iov, 1,
&req->recv.uct_ctx);
if (status != UCS_OK) {
ucs_assert((status == UCS_ERR_NO_RESOURCE) ||
(status == UCS_ERR_EXCEEDS_LIMIT));
ucs_assert((status == UCS_ERR_NO_RESOURCE) ||
(status == UCS_ERR_EXCEEDS_LIMIT) ||
(status == UCS_ERR_ALREADY_EXISTS));
/* No more matching entries in the transport.
* TODO keep registration in case SW RNDV protocol will be used */
ucp_tag_offload_release_buf(req, 1);
Expand Down
23 changes: 23 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5.inl
Original file line number Diff line number Diff line change
Expand Up @@ -1016,10 +1016,19 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface,
uct_rc_mlx5_tag_entry_t *tag_entry;
uint16_t next_idx;
unsigned ctrl_size;
int ret;

UCT_CHECK_IOV_SIZE(iovcnt, 1ul, "uct_rc_mlx5_iface_common_tag_recv");
UCT_RC_MLX5_CHECK_TAG(iface);

kh_put(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iov->buffer, &ret);
if (ucs_unlikely(!ret)) {
/* Do not post the same buffer more than once (even with different tags)
* to avoid memory corruption. */
return UCS_ERR_ALREADY_EXISTS;
}
ucs_assert(ret > 0);

ctrl_size = sizeof(struct mlx5_wqe_ctrl_seg) +
sizeof(uct_rc_mlx5_wqe_tm_seg_t);
tag_entry = iface->tm.head;
Expand Down Expand Up @@ -1053,6 +1062,17 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface,
return UCS_OK;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_mlx5_iface_tag_del_from_hash(uct_rc_mlx5_iface_common_t *iface,
void *buffer)
{
khiter_t iter;

iter = kh_get(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, buffer);
ucs_assert(iter != kh_end(&iface->tm.tag_addrs));
kh_del(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iter);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_mlx5_iface_common_tag_recv_cancel(uct_rc_mlx5_iface_common_t *iface,
uct_tag_context_t *ctx, int force)
Expand All @@ -1067,6 +1087,7 @@ uct_rc_mlx5_iface_common_tag_recv_cancel(uct_rc_mlx5_iface_common_t *iface,
if (ucs_likely(force)) {
flags = UCT_RC_MLX5_SRQ_FLAG_TM_SW_CNT;
uct_rc_mlx5_release_tag_entry(iface, tag_entry);
uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer);
} else {
flags = UCT_RC_MLX5_SRQ_FLAG_TM_CQE_REQ | UCT_RC_MLX5_SRQ_FLAG_TM_SW_CNT;
uct_rc_mlx5_add_cmd_wq_op(iface, tag_entry);
Expand Down Expand Up @@ -1097,6 +1118,7 @@ uct_rc_mlx5_iface_handle_tm_list_op(uct_rc_mlx5_iface_common_t *iface, int opcod
if (opcode == UCT_RC_MLX5_CQE_APP_OP_TM_REMOVE) {
ctx = op->tag->ctx;
priv = uct_rc_mlx5_ctx_priv(ctx);
uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer);
ctx->completed_cb(ctx, priv->tag, 0, priv->length, UCS_ERR_CANCELED);
}
}
Expand Down Expand Up @@ -1142,6 +1164,7 @@ uct_rc_mlx5_iface_handle_expected(uct_rc_mlx5_iface_common_t *iface, struct mlx5
byte_len = ntohl(cqe->byte_cnt);

uct_rc_mlx5_release_tag_entry(iface, tag_entry);
uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer);

if (cqe->op_own & MLX5_INLINE_SCATTER_64) {
ucs_assert(byte_len <= priv->length);
Expand Down
12 changes: 12 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface)
{
uct_rc_mlx5_mp_hash_key_t key_gid;
uint64_t key_lid;
void *tag_addr;

if (!UCT_RC_MLX5_TM_ENABLED(iface)) {
return;
Expand All @@ -425,6 +426,12 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface)
ucs_free(iface->tm.cmd_wq.ops);
uct_rc_mlx5_tag_cleanup(iface);

kh_foreach_key(&iface->tm.tag_addrs, tag_addr, {
ucs_debug("destroying iface %p, with %p offloaded to the HW",
iface, tag_addr);
});
kh_destroy_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs);

if (!UCT_RC_MLX5_MP_ENABLED(iface)) {
return;
}
Expand All @@ -440,6 +447,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface)
iface, key_gid.guid, key_gid.qp_num);
});
kh_destroy_inplace(uct_rc_mlx5_mp_hash_gid, &iface->tm.mp.hash_gid);

ucs_mpool_cleanup(&iface->tm.mp.tx_mp, 1);
}

Expand Down Expand Up @@ -734,6 +742,10 @@ void uct_rc_mlx5_init_rx_tm_common(uct_rc_mlx5_iface_common_t *iface,
* ptr_array is used as operation ID and is passed in "app_context"
* of TM header. */
ucs_ptr_array_init(&iface->tm.rndv_comps, 0, "rm_rndv_completions");

/* Set of addresses posted to the HW. Used to avoid posting of the same
* address more than once. */
kh_init_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs);
}

ucs_status_t uct_rc_mlx5_init_rx_tm(uct_rc_mlx5_iface_common_t *iface,
Expand Down
85 changes: 45 additions & 40 deletions src/uct/ib/rc/accel/rc_mlx5_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,68 +354,73 @@ typedef union uct_rc_mlx5_dm_copy_data {
} UCS_S_PACKED uct_rc_mlx5_dm_copy_data_t;
#endif

#define uct_rc_mlx5_tag_addr_hash(_ptr) kh_int64_hash_func((uintptr_t)(_ptr))
KHASH_INIT(uct_rc_mlx5_tag_addrs, void*, char, 0, uct_rc_mlx5_tag_addr_hash,
kh_int64_hash_equal)

typedef struct uct_rc_mlx5_iface_common {
uct_rc_iface_t super;
uct_rc_iface_t super;
struct {
ucs_mpool_t atomic_desc_mp;
uct_ib_mlx5_mmio_mode_t mmio_mode;
uint16_t bb_max; /* limit number of outstanding WQE BBs */
ucs_mpool_t atomic_desc_mp;
uct_ib_mlx5_mmio_mode_t mmio_mode;
uint16_t bb_max; /* limit number of outstanding WQE BBs */
} tx;
struct {
uct_ib_mlx5_srq_t srq;
void *pref_ptr;
uct_ib_mlx5_srq_t srq;
void *pref_ptr;
} rx;
uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM];
uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM];
struct {
uct_rc_mlx5_cmd_wq_t cmd_wq;
uct_rc_mlx5_tag_entry_t *head;
uct_rc_mlx5_tag_entry_t *tail;
uct_rc_mlx5_tag_entry_t *list;
ucs_mpool_t *bcopy_mp;

ucs_ptr_array_t rndv_comps;
size_t max_bcopy;
size_t max_zcopy;
unsigned num_tags;
unsigned num_outstanding;
unsigned max_rndv_data;
uint16_t unexpected_cnt;
uint16_t cmd_qp_len;
uint8_t enabled;
uct_rc_mlx5_cmd_wq_t cmd_wq;
uct_rc_mlx5_tag_entry_t *head;
uct_rc_mlx5_tag_entry_t *tail;
uct_rc_mlx5_tag_entry_t *list;
ucs_mpool_t *bcopy_mp;
khash_t(uct_rc_mlx5_tag_addrs) tag_addrs;

ucs_ptr_array_t rndv_comps;
size_t max_bcopy;
size_t max_zcopy;
unsigned num_tags;
unsigned num_outstanding;
unsigned max_rndv_data;
uint16_t unexpected_cnt;
uint16_t cmd_qp_len;
uint8_t enabled;
struct {
uint8_t num_strides;
ucs_mpool_t tx_mp;
uct_rc_mlx5_mp_context_t last_frag_ctx;
uint8_t num_strides;
ucs_mpool_t tx_mp;
uct_rc_mlx5_mp_context_t last_frag_ctx;
khash_t(uct_rc_mlx5_mp_hash_lid) hash_lid;
khash_t(uct_rc_mlx5_mp_hash_gid) hash_gid;
} mp;
struct {
void *arg; /* User defined arg */
uct_tag_unexp_eager_cb_t cb; /* Callback for unexpected eager messages */
void *arg; /* User defined arg */
uct_tag_unexp_eager_cb_t cb; /* Callback for unexpected eager messages */
} eager_unexp;

struct {
void *arg; /* User defined arg */
uct_tag_unexp_rndv_cb_t cb; /* Callback for unexpected rndv messages */
void *arg; /* User defined arg */
uct_tag_unexp_rndv_cb_t cb; /* Callback for unexpected rndv messages */
} rndv_unexp;
uct_rc_mlx5_release_desc_t eager_desc;
uct_rc_mlx5_release_desc_t rndv_desc;
uct_rc_mlx5_release_desc_t am_desc;
uct_rc_mlx5_release_desc_t eager_desc;
uct_rc_mlx5_release_desc_t rndv_desc;
uct_rc_mlx5_release_desc_t am_desc;
UCS_STATS_NODE_DECLARE(stats)
} tm;
#if HAVE_IBV_DM
struct {
uct_mlx5_dm_data_t *dm;
size_t seg_len; /* cached value to avoid double-pointer access */
ucs_status_t (*am_short)(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
const void *payload, unsigned length);
ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag,
const void *data, size_t length);
uct_mlx5_dm_data_t *dm;
size_t seg_len; /* cached value to avoid double-pointer access */
ucs_status_t (*am_short)(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
const void *payload, unsigned length);
ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag,
const void *data, size_t length);
} dm;
#endif
struct {
uint8_t atomic_fence_flag;
uint8_t put_fence_flag;
uint8_t atomic_fence_flag;
uint8_t put_fence_flag;
} config;
UCS_STATS_NODE_DECLARE(stats)
} uct_rc_mlx5_iface_common_t;
Expand Down
4 changes: 3 additions & 1 deletion test/gtest/ucp/test_ucp_tag_offload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ class test_ucp_tag_offload_stats : public test_ucp_tag_offload_multi {
UCP_TAG_MASK_FULL);

// Post and cancel another receive to make sure the first one was offloaded
request *req2 = recv_nb_and_check(buffer, count, DATATYPE, tag,
size_t size = receiver().worker()->context->config.ext.tm_thresh + 1;
std::vector<char> tbuf(size, 0);
request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag,
UCP_TAG_MASK_FULL);
req_cancel(receiver(), req2);

Expand Down
56 changes: 51 additions & 5 deletions test/gtest/uct/test_tag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ class test_tag : public uct_test {
// Message should be reported as unexpected and filled with
// recv seed (unchanged), as the incoming tag does not match the expected
check_rx_completion(r_ctx, false, RECV_SEED);
ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
flush();
}

Expand Down Expand Up @@ -650,16 +651,16 @@ UCS_TEST_SKIP_COND_P(test_tag, tag_limit,
!check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
{
const size_t length = 32;
mapped_buffer recvbuf(length, RECV_SEED, receiver());
ucs::ptr_vector<recv_ctx> rctxs;
recv_ctx *rctx_p;
ucs::ptr_vector<mapped_buffer> rbufs;
ucs_status_t status;

do {
// Can use the same recv buffer, as no sends will be issued.
rctx_p = (new recv_ctx());
init_recv_ctx(*rctx_p, &recvbuf, 1);
recv_ctx *rctx_p = new recv_ctx();
mapped_buffer *buf_p = new mapped_buffer(length, RECV_SEED, receiver());
init_recv_ctx(*rctx_p, buf_p, 1);
rctxs.push_back(rctx_p);
rbufs.push_back(buf_p);
status = tag_post(receiver(), *rctx_p);
// Make sure send resources are acknowledged, as we
// awaiting for tag space exhaustion.
Expand All @@ -678,6 +679,51 @@ UCS_TEST_SKIP_COND_P(test_tag, tag_limit,
status = tag_post(receiver(), rctxs.at(0));
} while ((ucs_get_time() < deadline) && (status == UCS_ERR_EXCEEDS_LIMIT));
ASSERT_UCS_OK(status);

// remove posted tags from HW
for (ucs::ptr_vector<recv_ctx>::const_iterator iter = rctxs.begin();
iter != rctxs.end() - 1; ++iter) {
ASSERT_UCS_OK(tag_cancel(receiver(), **iter, 1));
}
}

UCS_TEST_SKIP_COND_P(test_tag, tag_post_same,
!check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
{
const size_t length = 128;
mapped_buffer recvbuf(length, RECV_SEED, receiver());
recv_ctx r_ctx;
init_recv_ctx(r_ctx, &recvbuf, 1);

ASSERT_UCS_OK(tag_post(receiver(), r_ctx));

// Can't post the same buffer until it is completed/cancelled
ucs_status_t status = tag_post(receiver(), r_ctx);
EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS);

// Cancel with force, should be able to re-post immediately
ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
ASSERT_UCS_OK(tag_post(receiver(), r_ctx));

// Cancel without force, should be able to re-post when receive completion
ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 0));
status = tag_post(receiver(), r_ctx);
EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS); // no completion yet

wait_for_flag(&r_ctx.comp); // cancel completed, should be able to post
ASSERT_UCS_OK(tag_post(receiver(), r_ctx));

// Now send something to trigger rx completion
init_recv_ctx(r_ctx, &recvbuf, 1); // reinit rx to clear completed states
mapped_buffer sendbuf(length, SEND_SEED, sender());
send_ctx s_ctx;
init_send_ctx(s_ctx, &sendbuf, 1, reinterpret_cast<uint64_t>(&r_ctx));
ASSERT_UCS_OK(tag_eager_bcopy(sender(), s_ctx));

wait_for_flag(&r_ctx.comp); // message consumed, should be able to post
ASSERT_UCS_OK(tag_post(receiver(), r_ctx));

ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
}

UCS_TEST_SKIP_COND_P(test_tag, sw_rndv_expected,
Expand Down

0 comments on commit 65015fb

Please sign in to comment.