diff --git a/src/uct/ib/dc/accel/dc_mlx5.c b/src/uct/ib/dc/accel/dc_mlx5.c index b1a6de312f4..6351ef39764 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.c +++ b/src/uct/ib/dc/accel/dc_mlx5.c @@ -236,16 +236,6 @@ uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep, uct_rc_txqp_add_send_op(txqp, &desc->super); } -static inline void uct_dc_mlx5_iface_add_send_comp(uct_dc_mlx5_iface_t *iface, - uct_dc_mlx5_ep_t *ep, - uct_completion_t *comp) -{ - UCT_DC_MLX5_TXQP_DECL(txqp, txwq); - - UCT_DC_MLX5_IFACE_TXQP_GET(iface, &ep->super, txqp, txwq); - uct_rc_txqp_add_send_comp(&iface->super.super, txqp, comp, txwq->sig_pi); -} - static ucs_status_t UCS_F_ALWAYS_INLINE uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size, uint64_t value, uint64_t remote_addr, uct_rkey_t rkey) @@ -657,8 +647,9 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); - uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); + uct_dc_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_ep_t); ucs_status_t status; + UCT_DC_MLX5_TXQP_DECL(txqp, txwq); status = uct_dc_ep_flush(tl_ep, flags, comp); if (status == UCS_OK) { @@ -666,8 +657,10 @@ ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion } if (status == UCS_INPROGRESS) { - ucs_assert(ep->super.dci != UCT_DC_EP_NO_DCI); - uct_dc_mlx5_iface_add_send_comp(iface, ep, comp); + ucs_assert(ep->dci != UCT_DC_EP_NO_DCI); + UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq); + status = uct_rc_txqp_add_flush_comp(&iface->super.super, txqp, comp, + txwq->sig_pi); } return status; } diff --git a/src/uct/ib/dc/verbs/dc_verbs.c b/src/uct/ib/dc/verbs/dc_verbs.c index 0ee2da7800c..dfac71622d9 100644 --- a/src/uct/ib/dc/verbs/dc_verbs.c +++ b/src/uct/ib/dc/verbs/dc_verbs.c @@ -567,6 +567,7 @@ ucs_status_t uct_dc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completio uct_dc_verbs_iface_t); uct_dc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_verbs_ep_t); ucs_status_t status; + uint8_t dci; status = uct_dc_ep_flush(tl_ep, flags, comp); if (status == UCS_OK) { @@ -574,9 +575,13 @@ ucs_status_t uct_dc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completio } if (status == UCS_INPROGRESS) { - ucs_assert(ep->super.dci != UCT_DC_EP_NO_DCI); - uct_dc_verbs_iface_add_send_comp(iface, ep, comp); + dci = ep->super.dci; + ucs_assert(dci != UCT_DC_EP_NO_DCI); + status = uct_rc_txqp_add_flush_comp(&iface->super.super, + &iface->super.tx.dcis[dci].txqp, + comp, iface->dcis_txcnt[dci].pi); } + return status; } diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index 656975dce98..5c6599cb46b 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -534,9 +534,12 @@ ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags, sn = ep->tx.wq.sig_pi; } - uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, sn); - UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super); - return UCS_INPROGRESS; + status = uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.txqp, comp, sn); + if (status == UCS_INPROGRESS) { + UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super); + } + + return status; } ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, diff --git a/src/uct/ib/rc/base/rc_ep.c b/src/uct/ib/rc/base/rc_ep.c index 4b36befd952..71f09e02e75 100644 --- a/src/uct/ib/rc/base/rc_ep.c +++ b/src/uct/ib/rc/base/rc_ep.c @@ -369,6 +369,13 @@ void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op, uct_rc_iface_put_send_op(op); } +void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op, + const void *resp) +{ + uct_invoke_completion(op->user_comp, UCS_OK); + ucs_mpool_put(op); +} + ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n, unsigned flags) { @@ -501,6 +508,8 @@ void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status, op->flags &= ~UCT_RC_IFACE_SEND_OP_FLAG_INUSE; if (op->handler == uct_rc_ep_send_op_completion_handler) { uct_rc_iface_put_send_op(op); + } else if (op->handler == uct_rc_ep_flush_op_completion_handler) { + ucs_mpool_put(op); } else { desc = ucs_derived_of(op, uct_rc_iface_send_desc_t); ucs_mpool_put(desc); diff --git a/src/uct/ib/rc/base/rc_ep.h b/src/uct/ib/rc/base/rc_ep.h index 3d9b955c92b..2e303ddb48d 100644 --- a/src/uct/ib/rc/base/rc_ep.h +++ b/src/uct/ib/rc/base/rc_ep.h @@ -242,6 +242,9 @@ void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op, void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op, const void *resp); +void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op, + const void *resp); + ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n, unsigned flags); @@ -357,6 +360,28 @@ uct_rc_txqp_add_send_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, uct_rc_txqp_add_send_op_sn(txqp, op, sn); } +static UCS_F_ALWAYS_INLINE ucs_status_t +uct_rc_txqp_add_flush_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, + uct_completion_t *comp, uint16_t sn) +{ + uct_rc_iface_send_op_t *op; + + if (comp != NULL) { + op = (uct_rc_iface_send_op_t*)ucs_mpool_get(&iface->tx.flush_mp); + if (ucs_unlikely(op == NULL)) { + ucs_error("Failed to allocate flush completion"); + return UCS_ERR_NO_MEMORY; + } + + op->flags = 0; + op->user_comp = comp; + uct_rc_txqp_add_send_op_sn(txqp, op, sn); + VALGRIND_MAKE_MEM_DEFINED(op, sizeof(*op)); /* handler set by mpool init */ + } + + return UCS_INPROGRESS; +} + static UCS_F_ALWAYS_INLINE void uct_rc_txqp_completion_op(uct_rc_iface_send_op_t *op, const void *resp) { diff --git a/src/uct/ib/rc/base/rc_iface.c b/src/uct/ib/rc/base/rc_iface.c index 07e43792f03..bc2f4493028 100644 --- a/src/uct/ib/rc/base/rc_iface.c +++ b/src/uct/ib/rc/base/rc_iface.c @@ -135,6 +135,24 @@ static ucs_mpool_ops_t uct_rc_fc_pending_mpool_ops = { .obj_cleanup = NULL }; +static void +uct_rc_iface_flush_comp_init(ucs_mpool_t *mp, void *obj, void *chunk) +{ + uct_rc_iface_t *iface = ucs_container_of(mp, uct_rc_iface_t, tx.flush_mp); + uct_rc_iface_send_op_t *op = obj; + + op->handler = uct_rc_ep_flush_op_completion_handler; + op->flags = 0; + op->iface = iface; +} + +static ucs_mpool_ops_t uct_rc_flush_comp_mpool_ops = { + .chunk_alloc = ucs_mpool_chunk_malloc, + .chunk_release = ucs_mpool_chunk_free, + .obj_init = uct_rc_iface_flush_comp_init, + .obj_cleanup = NULL +}; + static void uct_rc_iface_tag_query(uct_rc_iface_t *iface, uct_iface_attr_t *iface_attr, size_t max_inline, size_t max_iov) @@ -488,6 +506,7 @@ static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface) { const unsigned count = iface->config.tx_ops_count; uct_rc_iface_send_op_t *op; + ucs_status_t status; iface->tx.ops_buffer = ucs_calloc(count, sizeof(*iface->tx.ops_buffer), "rc_tx_ops"); @@ -496,14 +515,22 @@ static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface) } iface->tx.free_ops = &iface->tx.ops_buffer[0]; - for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count - 1; ++op) { + for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count; ++op) { op->handler = uct_rc_ep_send_op_completion_handler; op->flags = UCT_RC_IFACE_SEND_OP_FLAG_IFACE; op->iface = iface; - op->next = op + 1; + op->next = (op == (iface->tx.ops_buffer + count - 1)) ? NULL : (op + 1); } - iface->tx.ops_buffer[count - 1].next = NULL; - return UCS_OK; + + /* Create memory pool for flush completions. Can't just alloc a certain + * size buffer, because number of simultaneous flushes is not limited by + * CQ or QP resources. */ + status = ucs_mpool_init(&iface->tx.flush_mp, 0, sizeof(*op), 0, + UCS_SYS_CACHE_LINE_SIZE, 256, + UINT_MAX, &uct_rc_flush_comp_mpool_ops, + "flush-comps-only"); + + return status; } static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface) @@ -522,6 +549,8 @@ static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface) total_count- free_count, total_count); } ucs_free(iface->tx.ops_buffer); + + ucs_mpool_cleanup(&iface->tx.flush_mp, 1); } #if IBV_EXP_HW_TM diff --git a/src/uct/ib/rc/base/rc_iface.h b/src/uct/ib/rc/base/rc_iface.h index d555fcf5983..cd8dbc9c5ce 100644 --- a/src/uct/ib/rc/base/rc_iface.h +++ b/src/uct/ib/rc/base/rc_iface.h @@ -228,8 +228,9 @@ struct uct_rc_iface { uct_ib_iface_t super; struct { - ucs_mpool_t mp; /* pool for send descriptors */ - ucs_mpool_t fc_mp; /* pool for FC grant pending requests */ + ucs_mpool_t mp; /* pool for send descriptors */ + ucs_mpool_t fc_mp; /* pool for FC grant pending requests */ + ucs_mpool_t flush_mp; /* pool for flush completions */ /* Credits for completions. * May be negative in case mlx5 because we take "num_bb" credits per * post to be able to calculate credits of outstanding ops on failure. diff --git a/src/uct/ib/rc/verbs/rc_verbs_ep.c b/src/uct/ib/rc/verbs/rc_verbs_ep.c index e3a0b31372f..d8abee6feaf 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_ep.c +++ b/src/uct/ib/rc/verbs/rc_verbs_ep.c @@ -521,9 +521,13 @@ ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags, } } - uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, ep->txcnt.pi); - UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super); - return UCS_INPROGRESS; + status = uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.txqp, comp, + ep->txcnt.pi); + if (status == UCS_INPROGRESS) { + UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super.super); + } + + return status; } ucs_status_t uct_rc_verbs_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, diff --git a/test/gtest/common/test_helpers.h b/test/gtest/common/test_helpers.h index a2493427c6f..afcfa24c065 100644 --- a/test/gtest/common/test_helpers.h +++ b/test/gtest/common/test_helpers.h @@ -78,7 +78,7 @@ #define ASSERT_UCS_OK_OR_INPROGRESS(_expr) \ do { \ ucs_status_t _status = (_expr); \ - if ((status) != UCS_OK && (_status) != UCS_INPROGRESS) { \ + if (((_status) != UCS_OK) && ((_status) != UCS_INPROGRESS)) { \ UCS_TEST_ABORT("Error: " << ucs_status_string(_status)); \ } \ } while (0) diff --git a/test/gtest/uct/ib/test_dc.cc b/test/gtest/uct/ib/test_dc.cc index 83612f7f7fc..cb105dafda5 100644 --- a/test/gtest/uct/ib/test_dc.cc +++ b/test/gtest/uct/ib/test_dc.cc @@ -22,7 +22,7 @@ extern "C" { _UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5) -class test_dc : public uct_test { +class test_dc : public test_rc { public: virtual void init() { uct_test::init(); @@ -71,7 +71,6 @@ class test_dc : public uct_test { } protected: - entity *m_e1, *m_e2; struct dcs_comp { uct_completion_t uct_comp; @@ -404,6 +403,10 @@ UCS_TEST_P(test_dc, dcs_ep_purge_pending) { EXPECT_EQ(0, iface->tx.stack_top); } +UCS_TEST_P(test_dc, stress_iface_ops) { + test_iface_ops(); +} + UCT_DC_INSTANTIATE_TEST_CASE(test_dc) diff --git a/test/gtest/uct/ib/test_rc.cc b/test/gtest/uct/ib/test_rc.cc index 9ad5f670073..8b336e720f3 100644 --- a/test/gtest/uct/ib/test_rc.cc +++ b/test/gtest/uct/ib/test_rc.cc @@ -35,6 +35,55 @@ void test_rc::connect() uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler, NULL, 0); } +// Check that iface tx ops buffer and flush comp memory pool are moderated +// properly when we have communication ops + lots of flushes +void test_rc::test_iface_ops() +{ + check_caps(UCT_IFACE_FLAG_PUT_ZCOPY); + int cq_len = 16; + + if (UCS_OK != uct_config_modify(m_iface_config, "RC_TX_CQ_LEN", + ucs::to_string(cq_len).c_str())) { + UCS_TEST_ABORT("Error: cannot enable random DCI policy"); + } + + entity *e = uct_test::create_entity(0); + m_entities.push_back(e); + e->connect(0, *m_e2, 0); + + mapped_buffer sendbuf(1024, 0ul, *e); + mapped_buffer recvbuf(1024, 0ul, *m_e2); + uct_completion_t comp; + comp.count = cq_len * 512; // some big value to avoid func invocation + comp.func = NULL; + + UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(), + sendbuf.memh(), + m_e1->iface_attr().cap.am.max_iov); + // For _x transports several CQEs can be consumed per WQE, post less put zcopy + // ops, so that flush would be sucessfull (otherwise flush will return + // NO_RESOURCES and completion will not be added for it). + for (int i = 0; i < cq_len / 3; i++) { + ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_put_zcopy(e->ep(0), iov, iovcnt, + recvbuf.addr(), + recvbuf.rkey(), &comp)); + + // Create some stress on iface (flush mp): + // post 10 flushes per every put. + for (int j = 0; j < 10; j++) { + ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_flush(e->ep(0), 0, &comp)); + } + } + + flush(); +} + +UCS_TEST_P(test_rc, stress_iface_ops) { + test_iface_ops(); +} + +UCT_RC_INSTANTIATE_TEST_CASE(test_rc) + class test_rc_max_wr : public test_rc { protected: diff --git a/test/gtest/uct/ib/test_rc.h b/test/gtest/uct/ib/test_rc.h index d4437035fe9..c5cd27174c7 100644 --- a/test/gtest/uct/ib/test_rc.h +++ b/test/gtest/uct/ib/test_rc.h @@ -41,6 +41,8 @@ class test_rc : public uct_test { uct_test::short_progress_loop(delta_ms); } + void test_iface_ops(); + static ucs_status_t am_dummy_handler(void *arg, void *data, size_t length, unsigned flags) { return UCS_OK;