Skip to content

Commit

Permalink
Merge pull request #2321 from yosefe/topic/uct-dc-fix-hang-fc-disable…
Browse files Browse the repository at this point in the history
…d-v1.3.x

UCT/DC: Don't consider fc_wnd if flow control is disabled - v1.3.x
  • Loading branch information
yosefe authored Feb 17, 2018
2 parents c9ef2c6 + 647c01a commit 0b45e29
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 21 deletions.
4 changes: 2 additions & 2 deletions src/uct/ib/dc/base/dc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ static inline int uct_dc_iface_dci_ep_can_send(uct_dc_ep_t *ep)
{
uct_dc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_iface_t);
return (!(ep->flags & UCT_DC_EP_FLAG_TX_WAIT)) &&
(ep->fc.fc_wnd > 0) &&
uct_rc_fc_has_resources(&iface->super, &ep->fc) &&
uct_dc_iface_dci_has_tx_resources(iface, ep->dci);
}

Expand All @@ -176,7 +176,7 @@ void uct_dc_iface_schedule_dci_alloc(uct_dc_iface_t *iface, uct_dc_ep_t *ep)
{
/* If FC window is empty the group will be scheduled when
* grant is received */
if (ep->fc.fc_wnd > 0) {
if (uct_rc_fc_has_resources(&iface->super, &ep->fc)) {
ucs_arbiter_group_schedule(uct_dc_iface_dci_waitq(iface), &ep->arb_group);
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ enum {

#define UCT_RC_UPDATE_FC_WND(_iface, _fc) \
{ \
/* For performance reasons, prefer to update fc_wnd unconditionally */ \
(_fc)->fc_wnd--; \
\
if ((_iface)->config.fc_enabled) { \
Expand Down Expand Up @@ -304,9 +305,19 @@ static UCS_F_ALWAYS_INLINE void uct_rc_txqp_check(uct_rc_txqp_t *txqp)
txqp->qp->qp_num, txqp->qp->state);
}

static UCS_F_ALWAYS_INLINE
int uct_rc_fc_has_resources(uct_rc_iface_t *iface, uct_rc_fc_t *fc)
{
/* When FC is disabled, fc_wnd may still become 0 because it's decremented
* unconditionally (for performance reasons) */
return (fc->fc_wnd > 0) || !iface->config.fc_enabled;
}

static UCS_F_ALWAYS_INLINE int uct_rc_ep_has_tx_resources(uct_rc_ep_t *ep)
{
return ((ep->txqp.available > 0) && (ep->fc.fc_wnd > 0));
uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);

return (ep->txqp.available > 0) && uct_rc_fc_has_resources(iface, &ep->fc);
}

static UCS_F_ALWAYS_INLINE void
Expand Down
43 changes: 39 additions & 4 deletions test/gtest/uct/ib/test_dc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ class test_dc_flow_control : public test_rc_flow_control {
public:

/* virtual */
uct_rc_fc_t* get_fc_ptr(entity *e) {
return &ucs_derived_of(e->ep(0), uct_dc_ep_t)->fc;
uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) {
return &ucs_derived_of(e->ep(ep_idx), uct_dc_ep_t)->fc;
}
};

Expand All @@ -395,6 +395,41 @@ UCS_TEST_P(test_dc_flow_control, pending_grant)
flush();
}

UCS_TEST_P(test_dc_flow_control, fc_disabled_flush)
{
test_flush_fc_disabled();
}

UCS_TEST_P(test_dc_flow_control, fc_disabled_pending_no_dci) {

pending_send_request_t pending_req;
pending_req.uct.func = pending_cb;
pending_req.cb_count = 0;

set_fc_disabled(m_e1);

/* Send on new endpoints until out of DCIs */
for (int ep_index = 0; ep_index < 20; ++ep_index) {
m_e1->connect(ep_index, *m_e2, ep_index);

ucs_status_t status = uct_ep_am_short(m_e1->ep(ep_index), 0, 0, NULL, 0);
if (status == UCS_ERR_NO_RESOURCE) {
/* if FC is disabled, it should be OK to set fc_wnd to 0 */
get_fc_ptr(m_e1, ep_index)->fc_wnd = 0;

/* Add to pending */
status = uct_ep_pending_add(m_e1->ep(ep_index), &pending_req.uct);
ASSERT_UCS_OK(status);

wait_for_flag(&pending_req.cb_count);
EXPECT_EQ(1, pending_req.cb_count);
break;
}

ASSERT_UCS_OK(status);
}
}

/* Check that soft request is not handled by DC */
UCS_TEST_P(test_dc_flow_control, soft_request)
{
Expand Down Expand Up @@ -487,8 +522,8 @@ class test_dc_flow_control_stats : public test_rc_flow_control_stats {
test_rc_flow_control_stats::init();
}

uct_rc_fc_t* get_fc_ptr(entity *e) {
return &ucs_derived_of(e->ep(0), uct_dc_ep_t)->fc;
uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) {
return &ucs_derived_of(e->ep(ep_idx), uct_dc_ep_t)->fc;
}

uct_rc_fc_t* fake_ep_fc_ptr(entity *e) {
Expand Down
53 changes: 46 additions & 7 deletions test/gtest/uct/ib/test_rc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ UCS_TEST_P(test_rc_max_wr, send_limit)
UCT_RC_INSTANTIATE_TEST_CASE(test_rc_max_wr)


int test_rc_flow_control::m_req_count = 0;
uint32_t test_rc_flow_control::m_am_rx_count = 0;

void test_rc_flow_control::init()
Expand All @@ -78,13 +77,24 @@ void test_rc_flow_control::init()
}
test_rc::init();

ucs_assert(rc_iface(m_e1)->config.fc_enabled);
ucs_assert(rc_iface(m_e2)->config.fc_enabled);

uct_iface_set_am_handler(m_e1->iface(), FLUSH_AM_ID, am_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e2->iface(), FLUSH_AM_ID, am_handler,
NULL, UCT_CB_FLAG_SYNC);

}

void test_rc_flow_control::cleanup()
{
/* Restore FC state to enabled, so iface cleanup will destroy the grant mpool */
rc_iface(m_e1)->config.fc_enabled = 1;
rc_iface(m_e2)->config.fc_enabled = 1;
test_rc::cleanup();
}

void test_rc_flow_control::send_am_and_flush(entity *e, int num_msg)
{
m_am_rx_count = 0;
Expand Down Expand Up @@ -147,24 +157,48 @@ void test_rc_flow_control::test_pending_grant(int wnd)
send_am_messages(m_e1, 1, UCS_OK);
}

void test_rc_flow_control::test_flush_fc_disabled()
{
set_fc_disabled(m_e1);
ucs_status_t status;

/* If FC is disabled, wnd=0 should not prevent the flush */
get_fc_ptr(m_e1)->fc_wnd = 0;
status = uct_ep_flush(m_e1->ep(0), 0, NULL);
EXPECT_EQ(UCS_OK, status);

/* send active message should be OK */
get_fc_ptr(m_e1)->fc_wnd = 1;
status = uct_ep_am_short(m_e1->ep(0), 0, 0, NULL, 0);
EXPECT_EQ(UCS_OK, status);
EXPECT_EQ(0, get_fc_ptr(m_e1)->fc_wnd);

/* flush must have resources */
status = uct_ep_flush(m_e1->ep(0), 0, NULL);
EXPECT_FALSE(UCS_STATUS_IS_ERR(status)) << ucs_status_string(status);
}

void test_rc_flow_control::test_pending_purge(int wnd, int num_pend_sends)
{
uct_pending_req_t reqs[num_pend_sends];
pending_send_request_t reqs[num_pend_sends];

disable_entity(m_e2);
set_fc_attributes(m_e1, true, wnd, wnd, 1);

m_req_count = 0;
send_am_and_flush(m_e1, wnd);

/* Now m2 ep should have FC grant message in the pending queue.
* Add some user pending requests as well */
for (int i = 0; i < num_pend_sends; i ++) {
reqs[i].func = NULL; /* make valgrind happy */
EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i]), UCS_OK);
for (int i = 0; i < num_pend_sends; i++) {
reqs[i].uct.func = NULL; /* make valgrind happy */
reqs[i].purge_count = 0;
EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i].uct), UCS_OK);
}
uct_ep_pending_purge(m_e2->ep(0), purge_cb, NULL);
EXPECT_EQ(num_pend_sends, m_req_count);

for (int i = 0; i < num_pend_sends; i++) {
EXPECT_EQ(1, reqs[i].purge_count);
}
}


Expand Down Expand Up @@ -206,6 +240,11 @@ UCS_TEST_P(test_rc_flow_control, pending_grant)
test_pending_grant(5);
}

UCS_TEST_P(test_rc_flow_control, fc_disabled_flush)
{
test_flush_fc_disabled();
}

UCT_RC_INSTANTIATE_TEST_CASE(test_rc_flow_control)


Expand Down
33 changes: 26 additions & 7 deletions test/gtest/uct/ib/test_rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class test_rc : public uct_test {
return ucs_derived_of(e->iface(), uct_rc_iface_t);
}

uct_rc_ep_t* rc_ep(entity *e) {
return ucs_derived_of(e->ep(0), uct_rc_ep_t);
uct_rc_ep_t* rc_ep(entity *e, int ep_idx = 0) {
return ucs_derived_of(e->ep(ep_idx), uct_rc_ep_t);
}

void send_am_messages(entity *e, int wnd, ucs_status_t expected,
Expand All @@ -54,14 +54,16 @@ class test_rc : public uct_test {
class test_rc_flow_control : public test_rc {
public:
typedef struct pending_send_request {
uct_ep_h ep;
uct_pending_req_t uct;
int cb_count;
int purge_count;
} pending_send_request_t;

void init();
void cleanup();

virtual uct_rc_fc_t* get_fc_ptr(entity *e) {
return &rc_ep(e)->fc;
virtual uct_rc_fc_t* get_fc_ptr(entity *e, int ep_idx = 0) {
return &rc_ep(e, ep_idx)->fc;
}

virtual void disable_entity(entity *e) {
Expand All @@ -84,6 +86,11 @@ class test_rc_flow_control : public test_rc {

}

void set_fc_disabled(entity *e) {
/* same as default settings in rc_iface_init */
set_fc_attributes(e, false, std::numeric_limits<int16_t>::max(), 0, 0);
}

void send_am_and_flush(entity *e, int num_msg);

void progress_loop(double delta_ms=10.0) {
Expand All @@ -102,7 +109,18 @@ class test_rc_flow_control : public test_rc {
}

static void purge_cb(uct_pending_req_t *self, void *arg) {
m_req_count++;
pending_send_request_t *req = ucs_container_of(self,
pending_send_request_t,
uct);
++req->purge_count;
}

static ucs_status_t pending_cb(uct_pending_req_t *self) {
pending_send_request_t *req = ucs_container_of(self,
pending_send_request_t,
uct);
++req->cb_count;
return UCS_OK;
}

void validate_grant(entity *e);
Expand All @@ -113,8 +131,9 @@ class test_rc_flow_control : public test_rc {

void test_pending_purge(int wnd, int num_pend_sends);

void test_flush_fc_disabled();

protected:
static int m_req_count;
static const uint8_t FLUSH_AM_ID = 1;
static uint32_t m_am_rx_count;
};
Expand Down

0 comments on commit 0b45e29

Please sign in to comment.