diff --git a/src/uct/ib/dc/base/dc_ep.h b/src/uct/ib/dc/base/dc_ep.h index 5a25ae0c3e0..79ff47e0743 100644 --- a/src/uct/ib/dc/base/dc_ep.h +++ b/src/uct/ib/dc/base/dc_ep.h @@ -167,7 +167,7 @@ static inline int uct_dc_iface_dci_ep_can_send(uct_dc_ep_t *ep) { uct_dc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_iface_t); return (!(ep->flags & UCT_DC_EP_FLAG_TX_WAIT)) && - (ep->fc.fc_wnd > 0) && + uct_rc_fc_has_resources(&iface->super, &ep->fc) && uct_dc_iface_dci_has_tx_resources(iface, ep->dci); } @@ -176,7 +176,7 @@ void uct_dc_iface_schedule_dci_alloc(uct_dc_iface_t *iface, uct_dc_ep_t *ep) { /* If FC window is empty the group will be scheduled when * grant is received */ - if (ep->fc.fc_wnd > 0) { + if (uct_rc_fc_has_resources(&iface->super, &ep->fc)) { ucs_arbiter_group_schedule(uct_dc_iface_dci_waitq(iface), &ep->arb_group); } } diff --git a/src/uct/ib/rc/base/rc_ep.h b/src/uct/ib/rc/base/rc_ep.h index 78c9970b509..a8dff96a9f5 100644 --- a/src/uct/ib/rc/base/rc_ep.h +++ b/src/uct/ib/rc/base/rc_ep.h @@ -120,6 +120,7 @@ enum { #define UCT_RC_UPDATE_FC_WND(_iface, _fc) \ { \ + /* For performance reasons, prefer to update fc_wnd unconditionally */ \ (_fc)->fc_wnd--; \ \ if ((_iface)->config.fc_enabled) { \ @@ -304,9 +305,19 @@ static UCS_F_ALWAYS_INLINE void uct_rc_txqp_check(uct_rc_txqp_t *txqp) txqp->qp->qp_num, txqp->qp->state); } +static UCS_F_ALWAYS_INLINE +int uct_rc_fc_has_resources(uct_rc_iface_t *iface, uct_rc_fc_t *fc) +{ + /* When FC is disabled, fc_wnd may still become 0 because it's decremented + * unconditionally (for performance reasons) */ + return (fc->fc_wnd > 0) || !iface->config.fc_enabled; +} + static UCS_F_ALWAYS_INLINE int uct_rc_ep_has_tx_resources(uct_rc_ep_t *ep) { - return ((ep->txqp.available > 0) && (ep->fc.fc_wnd > 0)); + uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t); + + return (ep->txqp.available > 0) && uct_rc_fc_has_resources(iface, &ep->fc); } static UCS_F_ALWAYS_INLINE void diff --git a/test/gtest/uct/ib/test_dc.cc b/test/gtest/uct/ib/test_dc.cc index 87ab9664483..442c59330b8 100644 --- a/test/gtest/uct/ib/test_dc.cc +++ b/test/gtest/uct/ib/test_dc.cc @@ -371,8 +371,8 @@ class test_dc_flow_control : public test_rc_flow_control { public: /* virtual */ - uct_rc_fc_t* get_fc_ptr(entity *e) { - return &ucs_derived_of(e->ep(0), uct_dc_ep_t)->fc; + uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) { + return &ucs_derived_of(e->ep(ep_idx), uct_dc_ep_t)->fc; } }; @@ -395,6 +395,41 @@ UCS_TEST_P(test_dc_flow_control, pending_grant) flush(); } +UCS_TEST_P(test_dc_flow_control, fc_disabled_flush) +{ + test_flush_fc_disabled(); +} + +UCS_TEST_P(test_dc_flow_control, fc_disabled_pending_no_dci) { + + pending_send_request_t pending_req; + pending_req.uct.func = pending_cb; + pending_req.cb_count = 0; + + set_fc_disabled(m_e1); + + /* Send on new endpoints until out of DCIs */ + for (int ep_index = 0; ep_index < 20; ++ep_index) { + m_e1->connect(ep_index, *m_e2, ep_index); + + ucs_status_t status = uct_ep_am_short(m_e1->ep(ep_index), 0, 0, NULL, 0); + if (status == UCS_ERR_NO_RESOURCE) { + /* if FC is disabled, it should be OK to set fc_wnd to 0 */ + get_fc_ptr(m_e1, ep_index)->fc_wnd = 0; + + /* Add to pending */ + status = uct_ep_pending_add(m_e1->ep(ep_index), &pending_req.uct); + ASSERT_UCS_OK(status); + + wait_for_flag(&pending_req.cb_count); + EXPECT_EQ(1, pending_req.cb_count); + break; + } + + ASSERT_UCS_OK(status); + } +} + /* Check that soft request is not handled by DC */ UCS_TEST_P(test_dc_flow_control, soft_request) { @@ -487,8 +522,8 @@ class test_dc_flow_control_stats : public test_rc_flow_control_stats { test_rc_flow_control_stats::init(); } - uct_rc_fc_t* get_fc_ptr(entity *e) { - return &ucs_derived_of(e->ep(0), uct_dc_ep_t)->fc; + uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) { + return &ucs_derived_of(e->ep(ep_idx), uct_dc_ep_t)->fc; } uct_rc_fc_t* fake_ep_fc_ptr(entity *e) { diff --git a/test/gtest/uct/ib/test_rc.cc b/test/gtest/uct/ib/test_rc.cc index ff29fce6596..552dcfb9ebf 100644 --- a/test/gtest/uct/ib/test_rc.cc +++ b/test/gtest/uct/ib/test_rc.cc @@ -67,7 +67,6 @@ UCS_TEST_P(test_rc_max_wr, send_limit) UCT_RC_INSTANTIATE_TEST_CASE(test_rc_max_wr) -int test_rc_flow_control::m_req_count = 0; uint32_t test_rc_flow_control::m_am_rx_count = 0; void test_rc_flow_control::init() @@ -78,6 +77,9 @@ void test_rc_flow_control::init() } test_rc::init(); + ucs_assert(rc_iface(m_e1)->config.fc_enabled); + ucs_assert(rc_iface(m_e2)->config.fc_enabled); + uct_iface_set_am_handler(m_e1->iface(), FLUSH_AM_ID, am_handler, NULL, UCT_CB_FLAG_SYNC); uct_iface_set_am_handler(m_e2->iface(), FLUSH_AM_ID, am_handler, @@ -85,6 +87,14 @@ void test_rc_flow_control::init() } +void test_rc_flow_control::cleanup() +{ + /* Restore FC state to enabled, so iface cleanup will destroy the grant mpool */ + rc_iface(m_e1)->config.fc_enabled = 1; + rc_iface(m_e2)->config.fc_enabled = 1; + test_rc::cleanup(); +} + void test_rc_flow_control::send_am_and_flush(entity *e, int num_msg) { m_am_rx_count = 0; @@ -147,24 +157,48 @@ void test_rc_flow_control::test_pending_grant(int wnd) send_am_messages(m_e1, 1, UCS_OK); } +void test_rc_flow_control::test_flush_fc_disabled() +{ + set_fc_disabled(m_e1); + ucs_status_t status; + + /* If FC is disabled, wnd=0 should not prevent the flush */ + get_fc_ptr(m_e1)->fc_wnd = 0; + status = uct_ep_flush(m_e1->ep(0), 0, NULL); + EXPECT_EQ(UCS_OK, status); + + /* send active message should be OK */ + get_fc_ptr(m_e1)->fc_wnd = 1; + status = uct_ep_am_short(m_e1->ep(0), 0, 0, NULL, 0); + EXPECT_EQ(UCS_OK, status); + EXPECT_EQ(0, get_fc_ptr(m_e1)->fc_wnd); + + /* flush must have resources */ + status = uct_ep_flush(m_e1->ep(0), 0, NULL); + EXPECT_FALSE(UCS_STATUS_IS_ERR(status)) << ucs_status_string(status); +} + void test_rc_flow_control::test_pending_purge(int wnd, int num_pend_sends) { - uct_pending_req_t reqs[num_pend_sends]; + pending_send_request_t reqs[num_pend_sends]; disable_entity(m_e2); set_fc_attributes(m_e1, true, wnd, wnd, 1); - m_req_count = 0; send_am_and_flush(m_e1, wnd); /* Now m2 ep should have FC grant message in the pending queue. * Add some user pending requests as well */ - for (int i = 0; i < num_pend_sends; i ++) { - reqs[i].func = NULL; /* make valgrind happy */ - EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i]), UCS_OK); + for (int i = 0; i < num_pend_sends; i++) { + reqs[i].uct.func = NULL; /* make valgrind happy */ + reqs[i].purge_count = 0; + EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i].uct), UCS_OK); } uct_ep_pending_purge(m_e2->ep(0), purge_cb, NULL); - EXPECT_EQ(num_pend_sends, m_req_count); + + for (int i = 0; i < num_pend_sends; i++) { + EXPECT_EQ(1, reqs[i].purge_count); + } } @@ -206,6 +240,11 @@ UCS_TEST_P(test_rc_flow_control, pending_grant) test_pending_grant(5); } +UCS_TEST_P(test_rc_flow_control, fc_disabled_flush) +{ + test_flush_fc_disabled(); +} + UCT_RC_INSTANTIATE_TEST_CASE(test_rc_flow_control) diff --git a/test/gtest/uct/ib/test_rc.h b/test/gtest/uct/ib/test_rc.h index 628247daf8b..d4437035fe9 100644 --- a/test/gtest/uct/ib/test_rc.h +++ b/test/gtest/uct/ib/test_rc.h @@ -26,8 +26,8 @@ class test_rc : public uct_test { return ucs_derived_of(e->iface(), uct_rc_iface_t); } - uct_rc_ep_t* rc_ep(entity *e) { - return ucs_derived_of(e->ep(0), uct_rc_ep_t); + uct_rc_ep_t* rc_ep(entity *e, int ep_idx = 0) { + return ucs_derived_of(e->ep(ep_idx), uct_rc_ep_t); } void send_am_messages(entity *e, int wnd, ucs_status_t expected, @@ -54,14 +54,16 @@ class test_rc : public uct_test { class test_rc_flow_control : public test_rc { public: typedef struct pending_send_request { - uct_ep_h ep; uct_pending_req_t uct; + int cb_count; + int purge_count; } pending_send_request_t; void init(); + void cleanup(); - virtual uct_rc_fc_t* get_fc_ptr(entity *e) { - return &rc_ep(e)->fc; + virtual uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) { + return &rc_ep(e, ep_idx)->fc; } virtual void disable_entity(entity *e) { @@ -84,6 +86,11 @@ class test_rc_flow_control : public test_rc { } + void set_fc_disabled(entity *e) { + /* same as default settings in rc_iface_init */ + set_fc_attributes(e, false, std::numeric_limits::max(), 0, 0); + } + void send_am_and_flush(entity *e, int num_msg); void progress_loop(double delta_ms=10.0) { @@ -102,7 +109,18 @@ class test_rc_flow_control : public test_rc { } static void purge_cb(uct_pending_req_t *self, void *arg) { - m_req_count++; + pending_send_request_t *req = ucs_container_of(self, + pending_send_request_t, + uct); + ++req->purge_count; + } + + static ucs_status_t pending_cb(uct_pending_req_t *self) { + pending_send_request_t *req = ucs_container_of(self, + pending_send_request_t, + uct); + ++req->cb_count; + return UCS_OK; } void validate_grant(entity *e); @@ -113,8 +131,9 @@ class test_rc_flow_control : public test_rc { void test_pending_purge(int wnd, int num_pend_sends); + void test_flush_fc_disabled(); + protected: - static int m_req_count; static const uint8_t FLUSH_AM_ID = 1; static uint32_t m_am_rx_count; };