Skip to content

Commit

Permalink
Merge pull request #3188 from brminich/topic/v1.5.x_tx_ops_fix
Browse files Browse the repository at this point in the history
UCT/RC/DC: Fix iface tx ops buffer overflow - v1.5.x
  • Loading branch information
yosefe authored Feb 8, 2019
2 parents 9cb9110 + 0a69f1a commit 5fc3f06
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 30 deletions.
19 changes: 6 additions & 13 deletions src/uct/ib/dc/accel/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,6 @@ uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
uct_rc_txqp_add_send_op(txqp, &desc->super);
}

static inline void uct_dc_mlx5_iface_add_send_comp(uct_dc_mlx5_iface_t *iface,
uct_dc_mlx5_ep_t *ep,
uct_completion_t *comp)
{
UCT_DC_MLX5_TXQP_DECL(txqp, txwq);

UCT_DC_MLX5_IFACE_TXQP_GET(iface, &ep->super, txqp, txwq);
uct_rc_txqp_add_send_comp(&iface->super.super, txqp, comp, txwq->sig_pi);
}

static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
uint64_t value, uint64_t remote_addr, uct_rkey_t rkey)
Expand Down Expand Up @@ -671,17 +661,20 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion
{
uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_dc_mlx5_iface_t);
uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
uct_dc_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_ep_t);
ucs_status_t status;
UCT_DC_MLX5_TXQP_DECL(txqp, txwq);

status = uct_dc_ep_flush(tl_ep, flags, comp);
if (status == UCS_OK) {
return UCS_OK; /* all sends completed */
}

if (status == UCS_INPROGRESS) {
ucs_assert(ep->super.dci != UCT_DC_EP_NO_DCI);
uct_dc_mlx5_iface_add_send_comp(iface, ep, comp);
ucs_assert(ep->dci != UCT_DC_EP_NO_DCI);
UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
status = uct_rc_txqp_add_flush_comp(&iface->super.super, txqp, comp,
txwq->sig_pi);
}
return status;
}
Expand Down
9 changes: 7 additions & 2 deletions src/uct/ib/dc/verbs/dc_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -567,16 +567,21 @@ ucs_status_t uct_dc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completio
uct_dc_verbs_iface_t);
uct_dc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_verbs_ep_t);
ucs_status_t status;
uint8_t dci;

status = uct_dc_ep_flush(tl_ep, flags, comp);
if (status == UCS_OK) {
return UCS_OK; /* all sends completed */
}

if (status == UCS_INPROGRESS) {
ucs_assert(ep->super.dci != UCT_DC_EP_NO_DCI);
uct_dc_verbs_iface_add_send_comp(iface, ep, comp);
dci = ep->super.dci;
ucs_assert(dci != UCT_DC_EP_NO_DCI);
status = uct_rc_txqp_add_flush_comp(&iface->super.super,
&iface->super.tx.dcis[dci].txqp,
comp, iface->dcis_txcnt[dci].pi);
}

return status;
}

Expand Down
9 changes: 6 additions & 3 deletions src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,12 @@ ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
sn = ep->tx.wq.sig_pi;
}

uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, sn);
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
return UCS_INPROGRESS;
status = uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.txqp, comp, sn);
if (status == UCS_INPROGRESS) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
}

return status;
}

ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/rc/base/rc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
uct_rc_iface_put_send_op(op);
}

void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
const void *resp)
{
uct_invoke_completion(op->user_comp, UCS_OK);
ucs_mpool_put(op);
}

ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
unsigned flags)
{
Expand Down Expand Up @@ -501,6 +508,8 @@ void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
op->flags &= ~UCT_RC_IFACE_SEND_OP_FLAG_INUSE;
if (op->handler == uct_rc_ep_send_op_completion_handler) {
uct_rc_iface_put_send_op(op);
} else if (op->handler == uct_rc_ep_flush_op_completion_handler) {
ucs_mpool_put(op);
} else {
desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
ucs_mpool_put(desc);
Expand Down
25 changes: 25 additions & 0 deletions src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op,
void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
const void *resp);

void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
const void *resp);

ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
unsigned flags);

Expand Down Expand Up @@ -357,6 +360,28 @@ uct_rc_txqp_add_send_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
uct_rc_txqp_add_send_op_sn(txqp, op, sn);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_txqp_add_flush_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
uct_completion_t *comp, uint16_t sn)
{
uct_rc_iface_send_op_t *op;

if (comp != NULL) {
op = (uct_rc_iface_send_op_t*)ucs_mpool_get(&iface->tx.flush_mp);
if (ucs_unlikely(op == NULL)) {
ucs_error("Failed to allocate flush completion");
return UCS_ERR_NO_MEMORY;
}

op->flags = 0;
op->user_comp = comp;
uct_rc_txqp_add_send_op_sn(txqp, op, sn);
VALGRIND_MAKE_MEM_DEFINED(op, sizeof(*op)); /* handler set by mpool init */
}

return UCS_INPROGRESS;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_completion_op(uct_rc_iface_send_op_t *op, const void *resp)
{
Expand Down
37 changes: 33 additions & 4 deletions src/uct/ib/rc/base/rc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ static ucs_mpool_ops_t uct_rc_fc_pending_mpool_ops = {
.obj_cleanup = NULL
};

static void
uct_rc_iface_flush_comp_init(ucs_mpool_t *mp, void *obj, void *chunk)
{
uct_rc_iface_t *iface = ucs_container_of(mp, uct_rc_iface_t, tx.flush_mp);
uct_rc_iface_send_op_t *op = obj;

op->handler = uct_rc_ep_flush_op_completion_handler;
op->flags = 0;
op->iface = iface;
}

static ucs_mpool_ops_t uct_rc_flush_comp_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
.obj_init = uct_rc_iface_flush_comp_init,
.obj_cleanup = NULL
};

static void uct_rc_iface_tag_query(uct_rc_iface_t *iface,
uct_iface_attr_t *iface_attr,
size_t max_inline, size_t max_iov)
Expand Down Expand Up @@ -488,6 +506,7 @@ static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface)
{
const unsigned count = iface->config.tx_ops_count;
uct_rc_iface_send_op_t *op;
ucs_status_t status;

iface->tx.ops_buffer = ucs_calloc(count, sizeof(*iface->tx.ops_buffer),
"rc_tx_ops");
Expand All @@ -496,14 +515,22 @@ static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface)
}

iface->tx.free_ops = &iface->tx.ops_buffer[0];
for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count - 1; ++op) {
for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count; ++op) {
op->handler = uct_rc_ep_send_op_completion_handler;
op->flags = UCT_RC_IFACE_SEND_OP_FLAG_IFACE;
op->iface = iface;
op->next = op + 1;
op->next = (op == (iface->tx.ops_buffer + count - 1)) ? NULL : (op + 1);
}
iface->tx.ops_buffer[count - 1].next = NULL;
return UCS_OK;

/* Create memory pool for flush completions. Can't just alloc a certain
* size buffer, because number of simultaneous flushes is not limited by
* CQ or QP resources. */
status = ucs_mpool_init(&iface->tx.flush_mp, 0, sizeof(*op), 0,
UCS_SYS_CACHE_LINE_SIZE, 256,
UINT_MAX, &uct_rc_flush_comp_mpool_ops,
"flush-comps-only");

return status;
}

static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface)
Expand All @@ -522,6 +549,8 @@ static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface)
total_count- free_count, total_count);
}
ucs_free(iface->tx.ops_buffer);

ucs_mpool_cleanup(&iface->tx.flush_mp, 1);
}

#if IBV_EXP_HW_TM
Expand Down
5 changes: 3 additions & 2 deletions src/uct/ib/rc/base/rc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ struct uct_rc_iface {
uct_ib_iface_t super;

struct {
ucs_mpool_t mp; /* pool for send descriptors */
ucs_mpool_t fc_mp; /* pool for FC grant pending requests */
ucs_mpool_t mp; /* pool for send descriptors */
ucs_mpool_t fc_mp; /* pool for FC grant pending requests */
ucs_mpool_t flush_mp; /* pool for flush completions */
/* Credits for completions.
* May be negative in case mlx5 because we take "num_bb" credits per
* post to be able to calculate credits of outstanding ops on failure.
Expand Down
10 changes: 7 additions & 3 deletions src/uct/ib/rc/verbs/rc_verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,13 @@ ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags,
}
}

uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, ep->txcnt.pi);
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
return UCS_INPROGRESS;
status = uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.txqp, comp,
ep->txcnt.pi);
if (status == UCS_INPROGRESS) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super);
}

return status;
}

ucs_status_t uct_rc_verbs_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/common/test_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
#define ASSERT_UCS_OK_OR_INPROGRESS(_expr) \
do { \
ucs_status_t _status = (_expr); \
if ((status) != UCS_OK && (_status) != UCS_INPROGRESS) { \
if (((_status) != UCS_OK) && ((_status) != UCS_INPROGRESS)) { \
UCS_TEST_ABORT("Error: " << ucs_status_string(_status)); \
} \
} while (0)
Expand Down
7 changes: 5 additions & 2 deletions test/gtest/uct/ib/test_dc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ extern "C" {
_UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5)


class test_dc : public uct_test {
class test_dc : public test_rc {
public:
virtual void init() {
uct_test::init();
Expand Down Expand Up @@ -71,7 +71,6 @@ class test_dc : public uct_test {
}

protected:
entity *m_e1, *m_e2;

struct dcs_comp {
uct_completion_t uct_comp;
Expand Down Expand Up @@ -404,6 +403,10 @@ UCS_TEST_P(test_dc, dcs_ep_purge_pending) {
EXPECT_EQ(0, iface->tx.stack_top);
}

UCS_TEST_P(test_dc, stress_iface_ops) {
test_iface_ops();
}

UCT_DC_INSTANTIATE_TEST_CASE(test_dc)


Expand Down
49 changes: 49 additions & 0 deletions test/gtest/uct/ib/test_rc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,55 @@ void test_rc::connect()
uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler, NULL, 0);
}

// Check that iface tx ops buffer and flush comp memory pool are moderated
// properly when we have communication ops + lots of flushes
void test_rc::test_iface_ops()
{
check_caps(UCT_IFACE_FLAG_PUT_ZCOPY);
int cq_len = 16;

if (UCS_OK != uct_config_modify(m_iface_config, "RC_TX_CQ_LEN",
ucs::to_string(cq_len).c_str())) {
UCS_TEST_ABORT("Error: cannot enable random DCI policy");
}

entity *e = uct_test::create_entity(0);
m_entities.push_back(e);
e->connect(0, *m_e2, 0);

mapped_buffer sendbuf(1024, 0ul, *e);
mapped_buffer recvbuf(1024, 0ul, *m_e2);
uct_completion_t comp;
comp.count = cq_len * 512; // some big value to avoid func invocation
comp.func = NULL;

UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(),
sendbuf.memh(),
m_e1->iface_attr().cap.am.max_iov);
// For _x transports several CQEs can be consumed per WQE, post less put zcopy
// ops, so that flush would be sucessfull (otherwise flush will return
// NO_RESOURCES and completion will not be added for it).
for (int i = 0; i < cq_len / 3; i++) {
ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_put_zcopy(e->ep(0), iov, iovcnt,
recvbuf.addr(),
recvbuf.rkey(), &comp));

// Create some stress on iface (flush mp):
// post 10 flushes per every put.
for (int j = 0; j < 10; j++) {
ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_flush(e->ep(0), 0, &comp));
}
}

flush();
}

UCS_TEST_P(test_rc, stress_iface_ops) {
test_iface_ops();
}

UCT_RC_INSTANTIATE_TEST_CASE(test_rc)


class test_rc_max_wr : public test_rc {
protected:
Expand Down
2 changes: 2 additions & 0 deletions test/gtest/uct/ib/test_rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class test_rc : public uct_test {
uct_test::short_progress_loop(delta_ms);
}

void test_iface_ops();

static ucs_status_t am_dummy_handler(void *arg, void *data, size_t length,
unsigned flags) {
return UCS_OK;
Expand Down

0 comments on commit 5fc3f06

Please sign in to comment.