Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/DC: Don't consider fc_wnd if flow control is disabled - v1.3.x #2321

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/uct/ib/dc/base/dc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ static inline int uct_dc_iface_dci_ep_can_send(uct_dc_ep_t *ep)
{
uct_dc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_iface_t);
return (!(ep->flags & UCT_DC_EP_FLAG_TX_WAIT)) &&
(ep->fc.fc_wnd > 0) &&
uct_rc_fc_has_resources(&iface->super, &ep->fc) &&
uct_dc_iface_dci_has_tx_resources(iface, ep->dci);
}

Expand All @@ -176,7 +176,7 @@ void uct_dc_iface_schedule_dci_alloc(uct_dc_iface_t *iface, uct_dc_ep_t *ep)
{
/* If FC window is empty the group will be scheduled when
* grant is received */
if (ep->fc.fc_wnd > 0) {
if (uct_rc_fc_has_resources(&iface->super, &ep->fc)) {
ucs_arbiter_group_schedule(uct_dc_iface_dci_waitq(iface), &ep->arb_group);
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ enum {

#define UCT_RC_UPDATE_FC_WND(_iface, _fc) \
{ \
/* For performance reasons, prefer to update fc_wnd unconditionally */ \
(_fc)->fc_wnd--; \
\
if ((_iface)->config.fc_enabled) { \
Expand Down Expand Up @@ -304,9 +305,19 @@ static UCS_F_ALWAYS_INLINE void uct_rc_txqp_check(uct_rc_txqp_t *txqp)
txqp->qp->qp_num, txqp->qp->state);
}

static UCS_F_ALWAYS_INLINE
int uct_rc_fc_has_resources(uct_rc_iface_t *iface, uct_rc_fc_t *fc)
{
/* When FC is disabled, fc_wnd may still become 0 because it's decremented
* unconditionally (for performance reasons) */
return (fc->fc_wnd > 0) || !iface->config.fc_enabled;
}

static UCS_F_ALWAYS_INLINE int uct_rc_ep_has_tx_resources(uct_rc_ep_t *ep)
{
return ((ep->txqp.available > 0) && (ep->fc.fc_wnd > 0));
uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);

return (ep->txqp.available > 0) && uct_rc_fc_has_resources(iface, &ep->fc);
}

static UCS_F_ALWAYS_INLINE void
Expand Down
43 changes: 39 additions & 4 deletions test/gtest/uct/ib/test_dc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ class test_dc_flow_control : public test_rc_flow_control {
public:

/* virtual */
uct_rc_fc_t* get_fc_ptr(entity *e) {
return &ucs_derived_of(e->ep(0), uct_dc_ep_t)->fc;
uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) {
return &ucs_derived_of(e->ep(ep_idx), uct_dc_ep_t)->fc;
}
};

Expand All @@ -395,6 +395,41 @@ UCS_TEST_P(test_dc_flow_control, pending_grant)
flush();
}

UCS_TEST_P(test_dc_flow_control, fc_disabled_flush)
{
test_flush_fc_disabled();
}

UCS_TEST_P(test_dc_flow_control, fc_disabled_pending_no_dci) {

pending_send_request_t pending_req;
pending_req.uct.func = pending_cb;
pending_req.cb_count = 0;

set_fc_disabled(m_e1);

/* Send on new endpoints until out of DCIs */
for (int ep_index = 0; ep_index < 20; ++ep_index) {
m_e1->connect(ep_index, *m_e2, ep_index);

ucs_status_t status = uct_ep_am_short(m_e1->ep(ep_index), 0, 0, NULL, 0);
if (status == UCS_ERR_NO_RESOURCE) {
/* if FC is disabled, it should be OK to set fc_wnd to 0 */
get_fc_ptr(m_e1, ep_index)->fc_wnd = 0;

/* Add to pending */
status = uct_ep_pending_add(m_e1->ep(ep_index), &pending_req.uct);
ASSERT_UCS_OK(status);

wait_for_flag(&pending_req.cb_count);
EXPECT_EQ(1, pending_req.cb_count);
break;
}

ASSERT_UCS_OK(status);
}
}

/* Check that soft request is not handled by DC */
UCS_TEST_P(test_dc_flow_control, soft_request)
{
Expand Down Expand Up @@ -487,8 +522,8 @@ class test_dc_flow_control_stats : public test_rc_flow_control_stats {
test_rc_flow_control_stats::init();
}

uct_rc_fc_t* get_fc_ptr(entity *e) {
return &ucs_derived_of(e->ep(0), uct_dc_ep_t)->fc;
uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) {
return &ucs_derived_of(e->ep(ep_idx), uct_dc_ep_t)->fc;
}

uct_rc_fc_t* fake_ep_fc_ptr(entity *e) {
Expand Down
53 changes: 46 additions & 7 deletions test/gtest/uct/ib/test_rc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ UCS_TEST_P(test_rc_max_wr, send_limit)
UCT_RC_INSTANTIATE_TEST_CASE(test_rc_max_wr)


int test_rc_flow_control::m_req_count = 0;
uint32_t test_rc_flow_control::m_am_rx_count = 0;

void test_rc_flow_control::init()
Expand All @@ -78,13 +77,24 @@ void test_rc_flow_control::init()
}
test_rc::init();

ucs_assert(rc_iface(m_e1)->config.fc_enabled);
ucs_assert(rc_iface(m_e2)->config.fc_enabled);

uct_iface_set_am_handler(m_e1->iface(), FLUSH_AM_ID, am_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e2->iface(), FLUSH_AM_ID, am_handler,
NULL, UCT_CB_FLAG_SYNC);

}

void test_rc_flow_control::cleanup()
{
/* Restore FC state to enabled, so iface cleanup will destroy the grant mpool */
rc_iface(m_e1)->config.fc_enabled = 1;
rc_iface(m_e2)->config.fc_enabled = 1;
test_rc::cleanup();
}

void test_rc_flow_control::send_am_and_flush(entity *e, int num_msg)
{
m_am_rx_count = 0;
Expand Down Expand Up @@ -147,24 +157,48 @@ void test_rc_flow_control::test_pending_grant(int wnd)
send_am_messages(m_e1, 1, UCS_OK);
}

void test_rc_flow_control::test_flush_fc_disabled()
{
set_fc_disabled(m_e1);
ucs_status_t status;

/* If FC is disabled, wnd=0 should not prevent the flush */
get_fc_ptr(m_e1)->fc_wnd = 0;
status = uct_ep_flush(m_e1->ep(0), 0, NULL);
EXPECT_EQ(UCS_OK, status);

/* send active message should be OK */
get_fc_ptr(m_e1)->fc_wnd = 1;
status = uct_ep_am_short(m_e1->ep(0), 0, 0, NULL, 0);
EXPECT_EQ(UCS_OK, status);
EXPECT_EQ(0, get_fc_ptr(m_e1)->fc_wnd);

/* flush must have resources */
status = uct_ep_flush(m_e1->ep(0), 0, NULL);
EXPECT_FALSE(UCS_STATUS_IS_ERR(status)) << ucs_status_string(status);
}

void test_rc_flow_control::test_pending_purge(int wnd, int num_pend_sends)
{
uct_pending_req_t reqs[num_pend_sends];
pending_send_request_t reqs[num_pend_sends];

disable_entity(m_e2);
set_fc_attributes(m_e1, true, wnd, wnd, 1);

m_req_count = 0;
send_am_and_flush(m_e1, wnd);

/* Now m2 ep should have FC grant message in the pending queue.
* Add some user pending requests as well */
for (int i = 0; i < num_pend_sends; i ++) {
reqs[i].func = NULL; /* make valgrind happy */
EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i]), UCS_OK);
for (int i = 0; i < num_pend_sends; i++) {
reqs[i].uct.func = NULL; /* make valgrind happy */
reqs[i].purge_count = 0;
EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i].uct), UCS_OK);
}
uct_ep_pending_purge(m_e2->ep(0), purge_cb, NULL);
EXPECT_EQ(num_pend_sends, m_req_count);

for (int i = 0; i < num_pend_sends; i++) {
EXPECT_EQ(1, reqs[i].purge_count);
}
}


Expand Down Expand Up @@ -206,6 +240,11 @@ UCS_TEST_P(test_rc_flow_control, pending_grant)
test_pending_grant(5);
}

UCS_TEST_P(test_rc_flow_control, fc_disabled_flush)
{
test_flush_fc_disabled();
}

UCT_RC_INSTANTIATE_TEST_CASE(test_rc_flow_control)


Expand Down
33 changes: 26 additions & 7 deletions test/gtest/uct/ib/test_rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class test_rc : public uct_test {
return ucs_derived_of(e->iface(), uct_rc_iface_t);
}

uct_rc_ep_t* rc_ep(entity *e) {
return ucs_derived_of(e->ep(0), uct_rc_ep_t);
uct_rc_ep_t* rc_ep(entity *e, int ep_idx = 0) {
return ucs_derived_of(e->ep(ep_idx), uct_rc_ep_t);
}

void send_am_messages(entity *e, int wnd, ucs_status_t expected,
Expand All @@ -54,14 +54,16 @@ class test_rc : public uct_test {
class test_rc_flow_control : public test_rc {
public:
typedef struct pending_send_request {
uct_ep_h ep;
uct_pending_req_t uct;
int cb_count;
int purge_count;
} pending_send_request_t;

void init();
void cleanup();

virtual uct_rc_fc_t* get_fc_ptr(entity *e) {
return &rc_ep(e)->fc;
virtual uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) {
return &rc_ep(e, ep_idx)->fc;
}

virtual void disable_entity(entity *e) {
Expand All @@ -84,6 +86,11 @@ class test_rc_flow_control : public test_rc {

}

void set_fc_disabled(entity *e) {
/* same as default settings in rc_iface_init */
set_fc_attributes(e, false, std::numeric_limits<int16_t>::max(), 0, 0);
}

void send_am_and_flush(entity *e, int num_msg);

void progress_loop(double delta_ms=10.0) {
Expand All @@ -102,7 +109,18 @@ class test_rc_flow_control : public test_rc {
}

static void purge_cb(uct_pending_req_t *self, void *arg) {
m_req_count++;
pending_send_request_t *req = ucs_container_of(self,
pending_send_request_t,
uct);
++req->purge_count;
}

static ucs_status_t pending_cb(uct_pending_req_t *self) {
pending_send_request_t *req = ucs_container_of(self,
pending_send_request_t,
uct);
++req->cb_count;
return UCS_OK;
}

void validate_grant(entity *e);
Expand All @@ -113,8 +131,9 @@ class test_rc_flow_control : public test_rc {

void test_pending_purge(int wnd, int num_pend_sends);

void test_flush_fc_disabled();

protected:
static int m_req_count;
static const uint8_t FLUSH_AM_ID = 1;
static uint32_t m_am_rx_count;
};
Expand Down