diff --git a/src/uct/ib/dc/dc_mlx5.c b/src/uct/ib/dc/dc_mlx5.c index 32639d9a146..42e14b2032a 100644 --- a/src/uct/ib/dc/dc_mlx5.c +++ b/src/uct/ib/dc/dc_mlx5.c @@ -224,15 +224,18 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface) ucs_trace_poll("dc iface %p tx_cqe: dci[%d] qpn 0x%x txqp %p hw_ci %d", iface, dci, qp_num, txqp, hw_ci); + uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci); + uct_rc_txqp_available_set(txqp, uct_ib_mlx5_txwq_update_bb(txwq, hw_ci)); ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max); + uct_rc_iface_update_reads(&iface->super.super); + uct_dc_mlx5_iface_dci_put(iface, dci); /* process pending elements prior to CQ entries to * avoid out-of-order transmission in completion * callbacks */ uct_dc_mlx5_iface_progress_pending(iface); - uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci); return 1; } @@ -377,7 +380,7 @@ static ucs_status_t uct_dc_mlx5_iface_create_qp(uct_dc_mlx5_iface_t *iface, return UCS_OK; err: - uct_rc_txqp_cleanup(&dci->txqp); + uct_rc_txqp_cleanup(&iface->super.super, &dci->txqp); err_qp: ibv_destroy_qp(dci->txwq.super.verbs.qp); return status; @@ -767,7 +770,7 @@ void uct_dc_mlx5_iface_dcis_destroy(uct_dc_mlx5_iface_t *iface, int max) { int i; for (i = 0; i < max; i++) { - uct_rc_txqp_cleanup(&iface->tx.dcis[i].txqp); + uct_rc_txqp_cleanup(&iface->super.super, &iface->tx.dcis[i].txqp); ucs_assert(iface->tx.dcis[i].txwq.super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS); uct_ib_destroy_qp(iface->tx.dcis[i].txwq.super.verbs.qp); } diff --git a/src/uct/ib/dc/dc_mlx5_ep.c b/src/uct/ib/dc/dc_mlx5_ep.c index 590357efe6c..d7f9bdd7afa 100644 --- a/src/uct/ib/dc/dc_mlx5_ep.c +++ b/src/uct/ib/dc/dc_mlx5_ep.c @@ -935,7 +935,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t) ucs_debug("ep (%p) is destroyed with %d outstanding ops", self, (int16_t)iface->super.super.config.tx_qp_len - uct_rc_txqp_available(&iface->tx.dcis[self->dci].txqp)); - uct_rc_txqp_purge_outstanding(&iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED, 1); + uct_rc_txqp_purge_outstanding(&iface->super.super, + &iface->tx.dcis[self->dci].txqp, + UCS_ERR_CANCELED, 1); iface->tx.dcis[self->dci].ep = NULL; } @@ -1281,7 +1283,7 @@ void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg, ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface)); - uct_rc_txqp_purge_outstanding(txqp, ep_status, 0); + uct_rc_txqp_purge_outstanding(&iface->super.super, txqp, ep_status, 0); /* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble is not updated for the error cqe and all outstanding wqes*/ diff --git a/src/uct/ib/dc/dc_mlx5_ep.h b/src/uct/ib/dc/dc_mlx5_ep.h index cabeca69b66..f223b8c8988 100644 --- a/src/uct/ib/dc/dc_mlx5_ep.h +++ b/src/uct/ib/dc/dc_mlx5_ep.h @@ -553,6 +553,7 @@ static inline struct mlx5_grh_av *uct_dc_mlx5_ep_get_grh(uct_dc_mlx5_ep_t *ep) * available TX resources. */ #define UCT_DC_CHECK_RES_AND_FC(_iface, _ep) \ { \ + UCT_RC_CHECK_NUM_RDMA_READ(&(_iface)->super.super) \ if (ucs_unlikely((_ep)->fc.fc_wnd <= \ (_iface)->super.super.config.fc_hard_thresh)) { \ ucs_status_t _status = uct_dc_mlx5_ep_check_fc(_iface, _ep); \ diff --git a/src/uct/ib/rc/accel/rc_mlx5.inl b/src/uct/ib/rc/accel/rc_mlx5.inl index 366705d711d..d7fe3ec504a 100644 --- a/src/uct/ib/rc/accel/rc_mlx5.inl +++ b/src/uct/ib/rc/accel/rc_mlx5.inl @@ -34,6 +34,8 @@ uct_rc_mlx5_common_update_tx_res(uct_rc_iface_t *rc_iface, uct_ib_mlx5_txwq_t *t uct_rc_txqp_available_add(txqp, bb_num); ucs_assert(uct_rc_txqp_available(txqp) <= txwq->bb_max); + uct_rc_iface_update_reads(rc_iface); + rc_iface->tx.cq_available += bb_num; ucs_assertv(rc_iface->tx.cq_available <= rc_iface->config.tx_cq_len, "cq_available=%d tx_cq_len=%d bb_num=%d txwq=%p txqp=%p", diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index 6cf688b0e9b..7740763e70f 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -1016,7 +1016,8 @@ ucs_status_t uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t *ep, uct_rc_mlx5_iface_common_t); int iface_res_released; - iface_res_released = uct_rc_txqp_purge_outstanding(&ep->super.txqp, status, 0); + iface_res_released = uct_rc_txqp_purge_outstanding(&iface->super, + &ep->super.txqp, status, 0); if (cqe != NULL) { uct_rc_mlx5_common_update_tx_res(&iface->super, &ep->tx.wq, diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index 85d8d194a88..261c5f3c8af 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -126,6 +126,8 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_common_t *iface) ucs_trace_poll("rc_mlx5 iface %p tx_cqe: ep %p qpn 0x%x hw_ci %d", iface, ep, qp_num, hw_ci); + uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci); + uct_rc_mlx5_common_update_tx_res(&iface->super, &ep->tx.wq, &ep->super.txqp, hw_ci); @@ -135,7 +137,6 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_common_t *iface) ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending, NULL); - uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci); ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending, NULL); diff --git a/src/uct/ib/rc/base/rc_ep.c b/src/uct/ib/rc/base/rc_ep.c index 20fc95f02fc..404c837eac3 100644 --- a/src/uct/ib/rc/base/rc_ep.c +++ b/src/uct/ib/rc/base/rc_ep.c @@ -60,9 +60,9 @@ ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface, stats_parent, "-0x%x", qp_num); } -void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp) +void uct_rc_txqp_cleanup(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp) { - uct_rc_txqp_purge_outstanding(txqp, UCS_ERR_CANCELED, 1); + uct_rc_txqp_purge_outstanding(iface, txqp, UCS_ERR_CANCELED, 1); UCS_STATS_NODE_FREE(txqp->stats); } @@ -123,18 +123,20 @@ UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface, uint32_t qp_num, return UCS_OK; err_txqp_cleanup: - uct_rc_txqp_cleanup(&self->txqp); + uct_rc_txqp_cleanup(iface, &self->txqp); return status; } static UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t) { + uct_rc_iface_t *iface = ucs_derived_of(self->super.super.iface, + uct_rc_iface_t); ucs_debug("destroy rc ep %p", self); ucs_list_del(&self->list); uct_rc_ep_pending_purge(&self->super.super, NULL, NULL); uct_rc_fc_cleanup(&self->fc); - uct_rc_txqp_cleanup(&self->txqp); + uct_rc_txqp_cleanup(iface, &self->txqp); } UCS_CLASS_DEFINE(uct_rc_ep_t, uct_base_ep_t) @@ -169,13 +171,13 @@ uct_rc_op_release_iface_resources(uct_rc_iface_send_op_t *op, int is_get_zcopy) uct_rc_iface_t *iface; if (is_get_zcopy) { - op->iface->tx.reads_available += op->length; + op->iface->tx.reads_completed += op->length; return; } desc = ucs_derived_of(op, uct_rc_iface_send_desc_t); iface = ucs_container_of(ucs_mpool_obj_owner(desc), uct_rc_iface_t, tx.mp); - iface->tx.reads_available += op->length; + iface->tx.reads_completed += op->length; } void uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t *op, const void *resp) @@ -337,8 +339,8 @@ ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self) return status; } -int uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status, - int is_log) +int uct_rc_txqp_purge_outstanding(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, + ucs_status_t status, int is_log) { uct_rc_iface_send_op_t *op; uct_rc_iface_send_desc_t *desc; @@ -365,9 +367,11 @@ int uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status, if ((op->handler == uct_rc_ep_get_bcopy_handler) || (op->handler == uct_rc_ep_get_bcopy_handler_no_completion)) { uct_rc_op_release_iface_resources(op, 0); + uct_rc_iface_update_reads(iface); iface_resources_released = 1; } else if (op->handler == uct_rc_ep_get_zcopy_completion_handler) { uct_rc_op_release_iface_resources(op, 1); + uct_rc_iface_update_reads(iface); iface_resources_released = 1; } } diff --git a/src/uct/ib/rc/base/rc_ep.h b/src/uct/ib/rc/base/rc_ep.h index 3dac18d7611..dcfbe0071d8 100644 --- a/src/uct/ib/rc/base/rc_ep.h +++ b/src/uct/ib/rc/base/rc_ep.h @@ -247,8 +247,8 @@ void uct_rc_fc_cleanup(uct_rc_fc_t *fc); ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self); -int uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status, - int is_log); +int uct_rc_txqp_purge_outstanding(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, + ucs_status_t status, int is_log); ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available, unsigned flags); @@ -267,7 +267,7 @@ void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(64, 1)(uct_rc_iface_send_op_t *op, ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface, uint32_t qp_num UCS_STATS_ARG(ucs_stats_node_t* stats_parent)); -void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp); +void uct_rc_txqp_cleanup(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp); static inline int16_t uct_rc_txqp_available(uct_rc_txqp_t *txqp) { diff --git a/src/uct/ib/rc/base/rc_iface.c b/src/uct/ib/rc/base/rc_iface.c index bb9120b8fa9..b65d147491f 100644 --- a/src/uct/ib/rc/base/rc_iface.c +++ b/src/uct/ib/rc/base/rc_iface.c @@ -596,7 +596,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md, self->tx.reads_available = config->tx.max_get_bytes; } - self->tx.arb_cbq_id = UCS_CALLBACKQ_ID_NULL; + self->tx.arb_cbq_id = UCS_CALLBACKQ_ID_NULL; + self->tx.reads_completed = 0; uct_ib_fence_info_init(&self->tx.fi); uct_rc_iface_set_path_mtu(self, config); diff --git a/src/uct/ib/rc/base/rc_iface.h b/src/uct/ib/rc/base/rc_iface.h index 7139329f3c1..ba2b8f89011 100644 --- a/src/uct/ib/rc/base/rc_iface.h +++ b/src/uct/ib/rc/base/rc_iface.h @@ -208,6 +208,7 @@ struct uct_rc_iface { * credit */ signed cq_available; ssize_t reads_available; + ssize_t reads_completed; uct_rc_iface_send_op_t *free_ops; /* stack of free send operations */ ucs_arbiter_t arbiter; uct_rc_iface_send_op_t *ops_buffer; @@ -393,6 +394,15 @@ uct_rc_iface_have_tx_cqe_avail(uct_rc_iface_t* iface) return iface->tx.cq_available > 0; } +static UCS_F_ALWAYS_INLINE void +uct_rc_iface_update_reads(uct_rc_iface_t *iface) +{ + ucs_assert(iface->tx.reads_completed >= 0); + + iface->tx.reads_available += iface->tx.reads_completed; + iface->tx.reads_completed = 0; +} + static UCS_F_ALWAYS_INLINE uct_rc_iface_send_op_t* uct_rc_iface_get_send_op(uct_rc_iface_t *iface) { diff --git a/src/uct/ib/rc/verbs/rc_verbs_ep.c b/src/uct/ib/rc/verbs/rc_verbs_ep.c index ed8b85a091c..e8d94ee2505 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_ep.c +++ b/src/uct/ib/rc/verbs/rc_verbs_ep.c @@ -252,7 +252,7 @@ ucs_status_t uct_rc_verbs_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t); UCT_RC_CHECK_AM_SHORT(id, length, iface->config.max_inline); - UCT_RC_CHECK_RES(&iface->super, &ep->super); + UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super); UCT_RC_CHECK_FC(&iface->super, &ep->super, id); uct_rc_verbs_iface_fill_inl_am_sge(iface, id, hdr, buffer, length); UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(hdr) + length); @@ -276,7 +276,7 @@ ssize_t uct_rc_verbs_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id, UCT_CHECK_AM_ID(id); - UCT_RC_CHECK_RES(&iface->super, &ep->super); + UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super); UCT_RC_CHECK_FC(&iface->super, &ep->super, id); UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(&iface->super, &iface->super.tx.mp, desc, id, uct_rc_am_hdr_fill, uct_rc_hdr_t, @@ -309,7 +309,7 @@ ucs_status_t uct_rc_verbs_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *he UCT_RC_CHECK_AM_ZCOPY(id, header_length, uct_iov_total_length(iov, iovcnt), iface->config.short_desc_size, iface->super.super.config.seg_size); - UCT_RC_CHECK_RES(&iface->super, &ep->super); + UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super); UCT_RC_CHECK_FC(&iface->super, &ep->super, id); UCT_RC_IFACE_GET_TX_AM_ZCOPY_DESC(&iface->super, &iface->short_desc_mp, @@ -461,7 +461,7 @@ ucs_status_t uct_rc_verbs_ep_handle_failure(uct_rc_verbs_ep_t *ep, iface->tx.cq_available += ep->txcnt.pi - ep->txcnt.ci; /* Reset CI to prevent cq_available overrun on ep_destroy */ ep->txcnt.ci = ep->txcnt.pi; - uct_rc_txqp_purge_outstanding(&ep->super.txqp, status, 0); + uct_rc_txqp_purge_outstanding(iface, &ep->super.txqp, status, 0); return iface->super.ops->set_ep_failed(&iface->super, &ep->super.super.super, status); diff --git a/src/uct/ib/rc/verbs/rc_verbs_iface.c b/src/uct/ib/rc/verbs/rc_verbs_iface.c index 9e709133644..6338a1be01c 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_iface.c +++ b/src/uct/ib/rc/verbs/rc_verbs_iface.c @@ -109,9 +109,14 @@ uct_rc_verbs_iface_poll_tx(uct_rc_verbs_iface_t *iface) count = uct_rc_verbs_txcq_get_comp_count(&wc[i], &ep->super.txqp); ucs_trace_poll("rc_verbs iface %p tx_wc wrid 0x%lx ep %p qpn 0x%x count %d", iface, wc[i].wr_id, ep, wc[i].qp_num, count); + + uct_rc_txqp_completion_desc(&ep->super.txqp, ep->txcnt.ci); + uct_rc_verbs_txqp_completed(&ep->super.txqp, &ep->txcnt, count); iface->super.tx.cq_available += count; + uct_rc_iface_update_reads(&iface->super); + /* process pending elements prior to CQ entries to avoid out-of-order * transmission in completion callbacks */ ucs_arbiter_group_schedule(&iface->super.tx.arbiter, @@ -119,7 +124,6 @@ uct_rc_verbs_iface_poll_tx(uct_rc_verbs_iface_t *iface) ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending, NULL); - uct_rc_txqp_completion_desc(&ep->super.txqp, ep->txcnt.ci); } return num_wcs; diff --git a/test/gtest/uct/ib/test_rc.cc b/test/gtest/uct/ib/test_rc.cc index 76824cece09..3bf787c95bf 100644 --- a/test/gtest/uct/ib/test_rc.cc +++ b/test/gtest/uct/ib/test_rc.cc @@ -139,6 +139,12 @@ UCT_INSTANTIATE_RC_TEST_CASE(test_rc_max_wr) class test_rc_get_limit : public test_rc { public: + struct am_completion_t { + uct_completion_t uct; + uct_ep_h ep; + int cb_count; + }; + test_rc_get_limit() { m_num_get_bytes = 8 * UCS_KBYTE + 557; // some non power of 2 modify_config("RC_TX_NUM_GET_BYTES", @@ -206,10 +212,52 @@ class test_rc_get_limit : public test_rc { return 0ul; } + void add_pending_ams(pending_send_request_t *reqs, int num_reqs) { + for (int i = 0; i < num_reqs; ++i) { + reqs[i].uct.func = pending_cb_send_am; + reqs[i].ep = m_e1->ep(0); + reqs[i].cb_count = i; + ASSERT_UCS_OK(uct_ep_pending_add(m_e1->ep(0), &reqs[i].uct, 0)); + } + } + + static ucs_status_t pending_cb_send_am(uct_pending_req_t *self) { + pending_send_request_t *req = ucs_container_of(self, + pending_send_request_t, + uct); + + return uct_ep_am_short(req->ep, AM_CHECK_ORDER_ID, req->cb_count, + NULL, 0); + } + + static ucs_status_t am_handler_ordering(void *arg, void *data, + size_t length, unsigned flags) { + uint64_t *prev_sn = (uint64_t*)arg; + uint64_t sn = *(uint64_t*)data; + + EXPECT_LE(*prev_sn, sn); + + *prev_sn = sn; + + return UCS_OK; + } + + static void get_comp_cb(uct_completion_t *self, ucs_status_t c_status) { + am_completion_t *comp = ucs_container_of(self, am_completion_t, uct); + + EXPECT_UCS_OK(c_status); + + ucs_status_t status = uct_ep_am_short(comp->ep, AM_CHECK_ORDER_ID, + comp->cb_count, NULL, 0); + EXPECT_TRUE(!UCS_STATUS_IS_ERR(status) || + (status == UCS_ERR_NO_RESOURCE)); + } + protected: - unsigned m_num_get_bytes; - unsigned m_max_get_zcopy; - uct_completion_t m_comp; + static const uint8_t AM_CHECK_ORDER_ID = 1; + unsigned m_num_get_bytes; + unsigned m_max_get_zcopy; + uct_completion_t m_comp; }; UCS_TEST_SKIP_COND_P(test_rc_get_limit, get_ops_limit, @@ -395,6 +443,91 @@ UCS_TEST_SKIP_COND_P(test_rc_get_limit, get_zcopy_purge, EXPECT_EQ(m_num_get_bytes, reads_available(m_e1)); } +// Check that it is not possible to send while not all pendings are dispatched +// yet. RDMA_READ resources are released in get function completion callbacks. +// Since in RC transports completions are handled after pending dispatch +// (to preserve ordering), RDMA_READ resources should be returned to iface +// in deferred manner. +UCS_TEST_SKIP_COND_P(test_rc_get_limit, ordering_pending, + !check_caps(UCT_IFACE_FLAG_GET_ZCOPY | + UCT_IFACE_FLAG_GET_BCOPY | + UCT_IFACE_FLAG_AM_SHORT | + UCT_IFACE_FLAG_PENDING)) +{ + volatile uint64_t sn = 0; + ucs_status_t status; + + uct_iface_set_am_handler(m_e2->iface(), AM_CHECK_ORDER_ID, + am_handler_ordering, (void*)&sn, 0); + + mapped_buffer sendbuf(1024, 0ul, *m_e1); + mapped_buffer recvbuf(1024, 0ul, *m_e2); + + post_max_reads(m_e1, sendbuf, recvbuf); + + EXPECT_EQ(UCS_ERR_NO_RESOURCE, + uct_ep_am_short(m_e1->ep(0), AM_CHECK_ORDER_ID, 0, NULL, 0)); + + const uint64_t num_pend = 3; + pending_send_request_t reqs[num_pend]; + add_pending_ams(reqs, num_pend); + + do { + progress(); + status = uct_ep_am_short(m_e1->ep(0), AM_CHECK_ORDER_ID, num_pend, + NULL, 0); + } while (status != UCS_OK); + + wait_for_value(&sn, num_pend, true); + EXPECT_EQ(num_pend, sn); + + flush(); + EXPECT_EQ(m_num_get_bytes, reads_available(m_e1)); +} + +UCS_TEST_SKIP_COND_P(test_rc_get_limit, ordering_comp_cb, + !check_caps(UCT_IFACE_FLAG_GET_ZCOPY | + UCT_IFACE_FLAG_GET_BCOPY | + UCT_IFACE_FLAG_AM_SHORT | + UCT_IFACE_FLAG_PENDING)) +{ + volatile uint64_t sn = 0; + const uint64_t num_pend = 3; + + uct_iface_set_am_handler(m_e2->iface(), AM_CHECK_ORDER_ID, + am_handler_ordering, (void*)&sn, 0); + + mapped_buffer sendbuf(1024, 0ul, *m_e1); + mapped_buffer recvbuf(1024, 0ul, *m_e2); + + am_completion_t comp; + comp.uct.func = get_comp_cb; + comp.uct.count = 1; + comp.uct.status = UCS_OK; + comp.ep = m_e1->ep(0); + comp.cb_count = num_pend; + ucs_status_t status = uct_ep_get_bcopy(m_e1->ep(0), + (uct_unpack_callback_t)memcpy, + sendbuf.ptr(), sendbuf.length(), + recvbuf.addr(), recvbuf.rkey(), + &comp.uct); + ASSERT_FALSE(UCS_STATUS_IS_ERR(status)); + + post_max_reads(m_e1, sendbuf, recvbuf); + + EXPECT_EQ(UCS_ERR_NO_RESOURCE, + uct_ep_am_short(m_e1->ep(0), AM_CHECK_ORDER_ID, 0, NULL, 0)); + + pending_send_request_t reqs[num_pend]; + add_pending_ams(reqs, num_pend); + + wait_for_value(&sn, num_pend - 1, true); + EXPECT_EQ(num_pend - 1, sn); + + flush(); + EXPECT_EQ(m_num_get_bytes, reads_available(m_e1)); +} + UCT_INSTANTIATE_RC_DC_TEST_CASE(test_rc_get_limit) uint32_t test_rc_flow_control::m_am_rx_count = 0; diff --git a/test/gtest/uct/ib/test_rc.h b/test/gtest/uct/ib/test_rc.h index e3273574623..77868b20561 100644 --- a/test/gtest/uct/ib/test_rc.h +++ b/test/gtest/uct/ib/test_rc.h @@ -19,6 +19,13 @@ extern "C" { class test_rc : public uct_test { public: + typedef struct pending_send_request { + uct_pending_req_t uct; + int cb_count; + int purge_count; + uct_ep_h ep; + } pending_send_request_t; + virtual void init(); void connect(); @@ -55,12 +62,6 @@ class test_rc : public uct_test { class test_rc_flow_control : public test_rc { public: - typedef struct pending_send_request { - uct_pending_req_t uct; - int cb_count; - int purge_count; - } pending_send_request_t; - void init(); void cleanup();