Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/RC/DC: Fix iface tx ops buffer overflow - v1.5.x #3188

Merged
merged 2 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions src/uct/ib/dc/accel/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,6 @@ uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
uct_rc_txqp_add_send_op(txqp, &desc->super);
}

static inline void uct_dc_mlx5_iface_add_send_comp(uct_dc_mlx5_iface_t *iface,
uct_dc_mlx5_ep_t *ep,
uct_completion_t *comp)
{
UCT_DC_MLX5_TXQP_DECL(txqp, txwq);

UCT_DC_MLX5_IFACE_TXQP_GET(iface, &ep->super, txqp, txwq);
uct_rc_txqp_add_send_comp(&iface->super.super, txqp, comp, txwq->sig_pi);
}

static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
uint64_t value, uint64_t remote_addr, uct_rkey_t rkey)
Expand Down Expand Up @@ -657,17 +647,20 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion
{
uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_dc_mlx5_iface_t);
uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
uct_dc_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_ep_t);
ucs_status_t status;
UCT_DC_MLX5_TXQP_DECL(txqp, txwq);

status = uct_dc_ep_flush(tl_ep, flags, comp);
if (status == UCS_OK) {
return UCS_OK; /* all sends completed */
}

if (status == UCS_INPROGRESS) {
ucs_assert(ep->super.dci != UCT_DC_EP_NO_DCI);
uct_dc_mlx5_iface_add_send_comp(iface, ep, comp);
ucs_assert(ep->dci != UCT_DC_EP_NO_DCI);
UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
status = uct_rc_txqp_add_flush_comp(&iface->super.super, txqp, comp,
txwq->sig_pi);
}
return status;
}
Expand Down
9 changes: 7 additions & 2 deletions src/uct/ib/dc/verbs/dc_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -567,16 +567,21 @@ ucs_status_t uct_dc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completio
uct_dc_verbs_iface_t);
uct_dc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_verbs_ep_t);
ucs_status_t status;
uint8_t dci;

status = uct_dc_ep_flush(tl_ep, flags, comp);
if (status == UCS_OK) {
return UCS_OK; /* all sends completed */
}

if (status == UCS_INPROGRESS) {
ucs_assert(ep->super.dci != UCT_DC_EP_NO_DCI);
uct_dc_verbs_iface_add_send_comp(iface, ep, comp);
dci = ep->super.dci;
ucs_assert(dci != UCT_DC_EP_NO_DCI);
status = uct_rc_txqp_add_flush_comp(&iface->super.super,
&iface->super.tx.dcis[dci].txqp,
comp, iface->dcis_txcnt[dci].pi);
}

return status;
}

Expand Down
9 changes: 6 additions & 3 deletions src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,12 @@ ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
sn = ep->tx.wq.sig_pi;
}

uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, sn);
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
return UCS_INPROGRESS;
status = uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.txqp, comp, sn);
if (status == UCS_INPROGRESS) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
}

return status;
}

ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/rc/base/rc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
uct_rc_iface_put_send_op(op);
}

void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
const void *resp)
{
uct_invoke_completion(op->user_comp, UCS_OK);
ucs_mpool_put(op);
}

ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
unsigned flags)
{
Expand Down Expand Up @@ -501,6 +508,8 @@ void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
op->flags &= ~UCT_RC_IFACE_SEND_OP_FLAG_INUSE;
if (op->handler == uct_rc_ep_send_op_completion_handler) {
uct_rc_iface_put_send_op(op);
} else if (op->handler == uct_rc_ep_flush_op_completion_handler) {
ucs_mpool_put(op);
} else {
desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
ucs_mpool_put(desc);
Expand Down
25 changes: 25 additions & 0 deletions src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op,
void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
const void *resp);

void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
const void *resp);

ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
unsigned flags);

Expand Down Expand Up @@ -357,6 +360,28 @@ uct_rc_txqp_add_send_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
uct_rc_txqp_add_send_op_sn(txqp, op, sn);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_txqp_add_flush_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
uct_completion_t *comp, uint16_t sn)
{
uct_rc_iface_send_op_t *op;

if (comp != NULL) {
op = (uct_rc_iface_send_op_t*)ucs_mpool_get(&iface->tx.flush_mp);
if (ucs_unlikely(op == NULL)) {
ucs_error("Failed to allocate flush completion");
return UCS_ERR_NO_MEMORY;
}

op->flags = 0;
op->user_comp = comp;
uct_rc_txqp_add_send_op_sn(txqp, op, sn);
VALGRIND_MAKE_MEM_DEFINED(op, sizeof(*op)); /* handler set by mpool init */
}

return UCS_INPROGRESS;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_completion_op(uct_rc_iface_send_op_t *op, const void *resp)
{
Expand Down
37 changes: 33 additions & 4 deletions src/uct/ib/rc/base/rc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ static ucs_mpool_ops_t uct_rc_fc_pending_mpool_ops = {
.obj_cleanup = NULL
};

static void
uct_rc_iface_flush_comp_init(ucs_mpool_t *mp, void *obj, void *chunk)
{
uct_rc_iface_t *iface = ucs_container_of(mp, uct_rc_iface_t, tx.flush_mp);
uct_rc_iface_send_op_t *op = obj;

op->handler = uct_rc_ep_flush_op_completion_handler;
op->flags = 0;
op->iface = iface;
}

static ucs_mpool_ops_t uct_rc_flush_comp_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
.obj_init = uct_rc_iface_flush_comp_init,
.obj_cleanup = NULL
};

static void uct_rc_iface_tag_query(uct_rc_iface_t *iface,
uct_iface_attr_t *iface_attr,
size_t max_inline, size_t max_iov)
Expand Down Expand Up @@ -488,6 +506,7 @@ static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface)
{
const unsigned count = iface->config.tx_ops_count;
uct_rc_iface_send_op_t *op;
ucs_status_t status;

iface->tx.ops_buffer = ucs_calloc(count, sizeof(*iface->tx.ops_buffer),
"rc_tx_ops");
Expand All @@ -496,14 +515,22 @@ static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface)
}

iface->tx.free_ops = &iface->tx.ops_buffer[0];
for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count - 1; ++op) {
for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count; ++op) {
op->handler = uct_rc_ep_send_op_completion_handler;
op->flags = UCT_RC_IFACE_SEND_OP_FLAG_IFACE;
op->iface = iface;
op->next = op + 1;
op->next = (op == (iface->tx.ops_buffer + count - 1)) ? NULL : (op + 1);
}
iface->tx.ops_buffer[count - 1].next = NULL;
return UCS_OK;

/* Create memory pool for flush completions. Can't just alloc a certain
* size buffer, because number of simultaneous flushes is not limited by
* CQ or QP resources. */
status = ucs_mpool_init(&iface->tx.flush_mp, 0, sizeof(*op), 0,
UCS_SYS_CACHE_LINE_SIZE, 256,
UINT_MAX, &uct_rc_flush_comp_mpool_ops,
"flush-comps-only");

return status;
}

static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface)
Expand All @@ -522,6 +549,8 @@ static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface)
total_count- free_count, total_count);
}
ucs_free(iface->tx.ops_buffer);

ucs_mpool_cleanup(&iface->tx.flush_mp, 1);
}

#if IBV_EXP_HW_TM
Expand Down
5 changes: 3 additions & 2 deletions src/uct/ib/rc/base/rc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ struct uct_rc_iface {
uct_ib_iface_t super;

struct {
ucs_mpool_t mp; /* pool for send descriptors */
ucs_mpool_t fc_mp; /* pool for FC grant pending requests */
ucs_mpool_t mp; /* pool for send descriptors */
ucs_mpool_t fc_mp; /* pool for FC grant pending requests */
ucs_mpool_t flush_mp; /* pool for flush completions */
/* Credits for completions.
* May be negative in case mlx5 because we take "num_bb" credits per
* post to be able to calculate credits of outstanding ops on failure.
Expand Down
10 changes: 7 additions & 3 deletions src/uct/ib/rc/verbs/rc_verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,13 @@ ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags,
}
}

uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, ep->txcnt.pi);
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
return UCS_INPROGRESS;
status = uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.txqp, comp,
ep->txcnt.pi);
if (status == UCS_INPROGRESS) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
}

return status;
}

ucs_status_t uct_rc_verbs_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/common/test_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
#define ASSERT_UCS_OK_OR_INPROGRESS(_expr) \
do { \
ucs_status_t _status = (_expr); \
if ((status) != UCS_OK && (_status) != UCS_INPROGRESS) { \
if (((_status) != UCS_OK) && ((_status) != UCS_INPROGRESS)) { \
UCS_TEST_ABORT("Error: " << ucs_status_string(_status)); \
} \
} while (0)
Expand Down
7 changes: 5 additions & 2 deletions test/gtest/uct/ib/test_dc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ extern "C" {
_UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5)


class test_dc : public uct_test {
class test_dc : public test_rc {
public:
virtual void init() {
uct_test::init();
Expand Down Expand Up @@ -71,7 +71,6 @@ class test_dc : public uct_test {
}

protected:
entity *m_e1, *m_e2;

struct dcs_comp {
uct_completion_t uct_comp;
Expand Down Expand Up @@ -404,6 +403,10 @@ UCS_TEST_P(test_dc, dcs_ep_purge_pending) {
EXPECT_EQ(0, iface->tx.stack_top);
}

UCS_TEST_P(test_dc, stress_iface_ops) {
test_iface_ops();
}

UCT_DC_INSTANTIATE_TEST_CASE(test_dc)


Expand Down
49 changes: 49 additions & 0 deletions test/gtest/uct/ib/test_rc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,55 @@ void test_rc::connect()
uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler, NULL, 0);
}

// Check that iface tx ops buffer and flush comp memory pool are moderated
// properly when we have communication ops + lots of flushes
void test_rc::test_iface_ops()
{
check_caps(UCT_IFACE_FLAG_PUT_ZCOPY);
int cq_len = 16;

if (UCS_OK != uct_config_modify(m_iface_config, "RC_TX_CQ_LEN",
ucs::to_string(cq_len).c_str())) {
UCS_TEST_ABORT("Error: cannot enable random DCI policy");
}

entity *e = uct_test::create_entity(0);
m_entities.push_back(e);
e->connect(0, *m_e2, 0);

mapped_buffer sendbuf(1024, 0ul, *e);
mapped_buffer recvbuf(1024, 0ul, *m_e2);
uct_completion_t comp;
comp.count = cq_len * 512; // some big value to avoid func invocation
comp.func = NULL;

UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(),
sendbuf.memh(),
m_e1->iface_attr().cap.am.max_iov);
// For _x transports several CQEs can be consumed per WQE, post less put zcopy
// ops, so that flush would be sucessfull (otherwise flush will return
// NO_RESOURCES and completion will not be added for it).
for (int i = 0; i < cq_len / 3; i++) {
ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_put_zcopy(e->ep(0), iov, iovcnt,
recvbuf.addr(),
recvbuf.rkey(), &comp));

// Create some stress on iface (flush mp):
// post 10 flushes per every put.
for (int j = 0; j < 10; j++) {
ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_flush(e->ep(0), 0, &comp));
}
}

flush();
}

UCS_TEST_P(test_rc, stress_iface_ops) {
test_iface_ops();
}

UCT_RC_INSTANTIATE_TEST_CASE(test_rc)


class test_rc_max_wr : public test_rc {
protected:
Expand Down
2 changes: 2 additions & 0 deletions test/gtest/uct/ib/test_rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class test_rc : public uct_test {
uct_test::short_progress_loop(delta_ms);
}

void test_iface_ops();

static ucs_status_t am_dummy_handler(void *arg, void *data, size_t length,
unsigned flags) {
return UCS_OK;
Expand Down