From 4febb9b4e6c4e11c592444fbb2dbda31c9f29021 Mon Sep 17 00:00:00 2001 From: Mikhail Brinskii Date: Wed, 7 Sep 2016 00:03:06 +0300 Subject: [PATCH 1/2] UCT/UD: Fix issue N544 (CREQ assert) Fixed: - If CREQ and CREP to peer are lost, peer may stay unconnected - type == UCT_UD_PACKET_CREQ assertion --- src/uct/ib/ud/base/ud_ep.c | 55 ++++++++++++------- test/gtest/uct/test_ud.cc | 107 ++++++++++++++++++++++++++----------- 2 files changed, 112 insertions(+), 50 deletions(-) diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index ea10bd40aa5..2a5b93080a0 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -7,6 +7,7 @@ #include "ud_ep.h" #include "ud_iface.h" #include "ud_inl.h" +#include "ud_def.h" #include #include @@ -359,13 +360,12 @@ uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep, { uct_ud_comp_desc_t *cdesc; uct_ud_send_skb_t *skb; - if (ucs_unlikely(UCT_UD_PSN_COMPARE(ack_psn, <=, ep->tx.acked_psn))) { return; } ep->tx.acked_psn = ack_psn; - + /* Release acknowledged skb's */ ucs_queue_for_each_extract(skb, &ep->tx.window, queue, UCT_UD_PSN_COMPARE(skb->neth->psn, <=, ack_psn)) { @@ -446,6 +446,7 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) ucs_assert_always(ep != NULL); ep->rx.ooo_pkts.head_sn = neth->psn; uct_ud_peer_copy(&ep->peer, &ctl->peer); + uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_PRIVATE); } else { if (ep->dest_ep_id == UCT_UD_EP_NULL_ID) { @@ -463,6 +464,7 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) */ uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0); } + uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP); } } @@ -472,7 +474,6 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) ucs_assert_always(ep->rx.ooo_pkts.head_sn == neth->psn); /* scedule connection reply op */ UCT_UD_EP_HOOK_CALL_RX(ep, neth, sizeof(*neth) + sizeof(*ctl)); - uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_RCVD); } @@ -578,6 +579,9 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b goto out; } if (neth->packet_type & UCT_UD_PACKET_FLAG_CTL) { + /* No need to track duplications, CREP always goes + * with ACK_REQ flag */ + ep->rx.ooo_pkts.head_sn = neth->psn; uct_ud_ep_rx_ctl(iface, ep, (uct_ud_ctl_hdr_t *)(neth + 1)); goto out; } @@ -740,8 +744,10 @@ static uct_ud_send_skb_t *uct_ud_ep_prepare_crep(uct_ud_ep_t *ep) ucs_assert_always(ep->dest_ep_id != UCT_UD_EP_NULL_ID); ucs_assert_always(ep->ep_id != UCT_UD_EP_NULL_ID); - skb = uct_ud_iface_res_skb_get(iface); - ucs_assert_always(skb != NULL); + skb = uct_ud_iface_get_tx_skb(iface, ep); + if (!skb) { + return NULL; + } neth = skb->neth; uct_ud_neth_init_data(ep, neth); @@ -784,9 +790,9 @@ static uct_ud_send_skb_t *uct_ud_ep_resend(uct_ud_ep_t *ep) return NULL; } - /* creq or crep must remove creq packet from window */ + /* creq/crep must remove creq packet from window */ ucs_assertv_always(!(uct_ud_ep_is_connected(ep) && - (sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_CTL) && + (uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) && !(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_AM)), "ep(%p): CREQ resend on endpoint which is already connected", ep); @@ -828,26 +834,32 @@ static uct_ud_send_skb_t *uct_ud_ep_resend(uct_ud_ep_t *ep) static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface) { uct_ud_send_skb_t *skb; + int flag = 0; if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) { skb = uct_ud_ep_prepare_creq(ep); if (skb) { - /* creq allocates real skb, it must be put on window like - * a regular packet to ensure a retransmission. - */ - ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, - 1); - uct_ud_iface_complete_tx_skb(iface, ep, skb); + flag = 1; uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ); } - return; } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREP)) { skb = uct_ud_ep_prepare_crep(ep); + if (skb) { + flag = 1; + uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREP); + } } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_RESEND)) { skb = uct_ud_ep_resend(ep); } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK)) { - skb = &iface->tx.skb_inl.super; - uct_ud_neth_ctl_ack(ep, skb->neth); + if (uct_ud_ep_is_connected(ep)) { + skb = &iface->tx.skb_inl.super; + uct_ud_neth_ctl_ack(ep, skb->neth); + } else { + /* Do not send ACKs if not connected yet. It may happen if + * CREQ and CREP from peer are lost. Need to wait for CREP + * resending by peer. */ + skb = NULL; + } uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK); } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK_REQ)) { skb = &iface->tx.skb_inl.super; @@ -865,8 +877,15 @@ static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface) } VALGRIND_MAKE_MEM_DEFINED(skb, sizeof *skb); - ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, 0); - uct_ud_iface_res_skb_put(iface, skb); + ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, flag); + if (flag) { + /* creq and crep allocate real skb, it must be put on window like + * a regular packet to ensure a retransmission. + */ + uct_ud_iface_complete_tx_skb(iface, ep, skb); + } else { + uct_ud_iface_res_skb_put(iface, skb); + } } static inline ucs_arbiter_cb_result_t diff --git a/test/gtest/uct/test_ud.cc b/test/gtest/uct/test_ud.cc index a71d43b98a5..90df5647ce1 100644 --- a/test/gtest/uct/test_ud.cc +++ b/test/gtest/uct/test_ud.cc @@ -65,13 +65,22 @@ class test_ud : public ud_base_test { } static int tx_count; - + static ucs_status_t count_tx(uct_ud_ep_t *ep, uct_ud_neth_t *neth) { tx_count++; return UCS_OK; } + static ucs_status_t invalidate_creq_tx(uct_ud_ep_t *ep, uct_ud_neth_t *neth) + { + if ((neth->packet_type & UCT_UD_PACKET_FLAG_CTL) && + (uct_ud_neth_get_dest_id(neth) == UCT_UD_EP_NULL_ID)) { + uct_ud_neth_set_dest_id(neth, 0xbeef); + } + return UCS_OK; + } + void validate_flush() { /* 1 packets transmitted, 1 packets received */ EXPECT_EQ(2, ep(m_e1)->tx.psn); @@ -200,8 +209,8 @@ UCS_TEST_P(test_ud, flush_iface) { #ifdef UCT_UD_EP_DEBUG_HOOKS /* disable ack req, - * send full window, - * should not be able to send some more + * send full window, + * should not be able to send some more */ UCS_TEST_P(test_ud, tx_window2) { unsigned i, N=13; @@ -284,14 +293,13 @@ UCS_TEST_P(test_ud, crep_drop1) { EXPECT_EQ(0U, ep(m_e1, 0)->conn_id); EXPECT_EQ(2, ep(m_e1, 0)->tx.psn); - EXPECT_EQ(0, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts)); - + EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts)); + check_connection(); } /* check that creq is not left on tx window if - * both sides connect simultaniously. That is - * creq is treated like crep if creq was sent + * both sides connect simultaniously. */ UCS_TEST_P(test_ud, crep_drop2) { m_e1->connect_to_iface(0, *m_e2); @@ -301,19 +309,20 @@ UCS_TEST_P(test_ud, crep_drop2) { ep(m_e2, 0)->rx.rx_hook = drop_ctl; short_progress_loop(); - /* expect creq sent and empty window */ + /* Expect that creq and crep are sent and window is empty */ EXPECT_EQ(1, ep(m_e1, 0)->tx.acked_psn); - EXPECT_EQ(2, ep(m_e1, 0)->tx.psn); + EXPECT_EQ(3, ep(m_e1, 0)->tx.psn); EXPECT_EQ(1, ep(m_e2, 0)->tx.acked_psn); - EXPECT_EQ(2, ep(m_e2, 0)->tx.psn); + EXPECT_EQ(3, ep(m_e2, 0)->tx.psn); } UCS_TEST_P(test_ud, creq_flush) { ucs_status_t status; m_e1->connect_to_iface(0, *m_e2); - /* setup filter to drop crep */ - ep(m_e1, 0)->rx.rx_hook = drop_ctl; + /* Setup filter to drop all packets. We have to drop CREP + * and ACK_REQ packets. */ + ep(m_e1, 0)->rx.rx_hook = drop_rx; short_progress_loop(50); /* do flush while ep is being connected it must return in progress */ status = uct_iface_flush(m_e1->iface(), 0, NULL); @@ -335,25 +344,25 @@ UCS_TEST_P(test_ud, ca_ai) { connect(); EXPECT_EQ(UCT_UD_CA_MIN_WINDOW, ep(m_e1)->ca.cwnd); EXPECT_EQ(UCT_UD_CA_MIN_WINDOW, ep(m_e2)->ca.cwnd); - + ep(m_e1, 0)->rx.rx_hook = count_rx_acks; prev_cwnd = ep(m_e1)->ca.cwnd; rx_ack_count = 0; - /* window increase upto max window should + /* window increase upto max window should * happen when we receive acks */ while(ep(m_e1)->ca.cwnd < max_window) { status = tx(m_e1); if (status != UCS_OK) { progress(); /* it is possible to get no acks if tx queue is full. - * But no more than 2 acks per window. - * One at 1/4 and one at the end + * But no more than 2 acks per window. + * One at 1/4 and one at the end * * every new ack should cause window increase */ - EXPECT_LE(rx_ack_count, 2); - EXPECT_EQ(rx_ack_count, + EXPECT_LE(rx_ack_count, 2); + EXPECT_EQ(rx_ack_count, UCT_UD_CA_AI_VALUE * (ep(m_e1)->ca.cwnd - prev_cwnd)); prev_cwnd = ep(m_e1)->ca.cwnd; rx_ack_count = 0; @@ -371,9 +380,9 @@ UCS_TEST_P(test_ud, ca_md) { connect(); - /* assume we are at the max window + /* assume we are at the max window * on receive drop all packets. After several retransmission - * attempts the window will be reduced to the minumum + * attempts the window will be reduced to the minumum */ max_window = RUNNING_ON_VALGRIND ? 64 : UCT_UD_CA_MAX_WINDOW; iters = RUNNING_ON_VALGRIND ? 0 : 1; @@ -436,9 +445,9 @@ UCS_TEST_P(test_ud, ca_resend) { * 4 packets will be retransmitted * first packet will have ack_req, * there will 2 ack_reqs - * in addition there may be up to two + * in addition there may be up to two * standalone ack_reqs - */ + */ disable_async(m_e1); disable_async(m_e2); short_progress_loop(100); @@ -459,7 +468,7 @@ UCS_TEST_P(test_ud, connect_iface_single) { EXPECT_EQ(2, ep(m_e1, 0)->tx.psn); EXPECT_EQ(1, ep(m_e1, 0)->tx.acked_psn); - EXPECT_EQ(0, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts)); + EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts)); check_connection(); } @@ -473,12 +482,12 @@ UCS_TEST_P(test_ud, connect_iface_2to1) { EXPECT_EQ(0U, ep(m_e1,0)->dest_ep_id); EXPECT_EQ(0U, ep(m_e1,0)->conn_id); EXPECT_EQ(2, ep(m_e1,0)->tx.psn); - EXPECT_EQ(0, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts)); + EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts)); EXPECT_EQ(1U, ep(m_e1,1)->dest_ep_id); EXPECT_EQ(1U, ep(m_e1,1)->conn_id); EXPECT_EQ(2, ep(m_e1,1)->tx.psn); - EXPECT_EQ(0, ucs_frag_list_sn(&ep(m_e1, 1)->rx.ooo_pkts)); + EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 1)->rx.ooo_pkts)); } UCS_TEST_P(test_ud, connect_iface_seq) { @@ -488,7 +497,8 @@ UCS_TEST_P(test_ud, connect_iface_seq) { EXPECT_EQ(0U, ep(m_e1)->dest_ep_id); EXPECT_EQ(0U, ep(m_e1)->conn_id); EXPECT_EQ(2, ep(m_e1)->tx.psn); - EXPECT_EQ(0, ucs_frag_list_sn(&ep(m_e1)->rx.ooo_pkts)); + /* one becase of crep */ + EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1)->rx.ooo_pkts)); /* now side two connects. existing ep will be reused */ m_e2->connect_to_iface(0, *m_e1); @@ -496,7 +506,7 @@ UCS_TEST_P(test_ud, connect_iface_seq) { EXPECT_EQ(0U, ep(m_e2)->dest_ep_id); EXPECT_EQ(0U, ep(m_e2)->ep_id); EXPECT_EQ(0U, ep(m_e2)->conn_id); - EXPECT_EQ(1, ep(m_e2)->tx.psn); + EXPECT_EQ(2, ep(m_e2)->tx.psn); /* one becase creq sets initial psn */ EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e2)->rx.ooo_pkts)); @@ -516,7 +526,7 @@ UCS_TEST_P(test_ud, connect_iface_sim) { EXPECT_EQ(0U, ep(m_e2)->dest_ep_id); EXPECT_EQ(0U, ep(m_e2)->ep_id); EXPECT_EQ(0U, ep(m_e2)->conn_id); - + /* psns are not checked because it really depends on scheduling */ } @@ -535,7 +545,7 @@ UCS_TEST_P(test_ud, connect_iface_sim2v2) { EXPECT_EQ(0U, ep(m_e2)->dest_ep_id); EXPECT_EQ(0U, ep(m_e2)->ep_id); EXPECT_EQ(0U, ep(m_e2)->conn_id); - + EXPECT_EQ(1U, ep(m_e1,1)->dest_ep_id); EXPECT_EQ(1U, ep(m_e1,1)->conn_id); EXPECT_EQ(1U, ep(m_e1,1)->ep_id); @@ -652,7 +662,7 @@ UCS_TEST_P(test_ud, ep_destroy_creq) { EXPECT_EQ(1U, ud_ep->ep_id); } -/* check that the amount of reserved skbs is not less than +/* check that the amount of reserved skbs is not less than * iface tx queue len */ UCS_TEST_P(test_ud, res_skb_basic) { @@ -667,13 +677,13 @@ UCS_TEST_P(test_ud, res_skb_basic) { uct_ud_send_skb_t *used_skbs[tx_qlen]; - for (i = 0; i < tx_qlen; i++) { + for (i = 0; i < tx_qlen; i++) { skb = uct_ud_iface_res_skb_get(ud_if); ASSERT_TRUE(skb); used_skbs[i] = skb; } - for (i = 0; i < tx_qlen; i++) { + for (i = 0; i < tx_qlen; i++) { uct_ud_iface_res_skb_put(ud_if, used_skbs[i]); } } @@ -729,6 +739,39 @@ UCS_TEST_P(test_ud, res_skb_tx) { } } +/* Simulate loss of ctl packets during simultaneous CREQs. + * Use-case: CREQ and CREP packets from m_e2 to m_e1 are lost. + * Check: that both eps (m_e1 and m_e2) are connected finally */ +UCS_TEST_P(test_ud, ctls_loss) { + + iface(m_e2)->tx.available = 0; + m_e1->connect_to_iface(0, *m_e2); + m_e2->connect_to_iface(0, *m_e1); + + /* Simulate loss of CREQ to m_e1 */ + ep(m_e2)->tx.tx_hook = invalidate_creq_tx; + iface(m_e2)->tx.available = 128; + iface(m_e1)->tx.available = 128; + + /* Simulate loss of CREP to m_e1 */ + ep(m_e1)->rx.rx_hook = drop_ctl; + short_progress_loop(300); + + /* m_e2 ep should be in connected state now, as it received CREQ which is + * counter to its own CREQ. So, send a packet to m_e1 (which is not in + * connected state yet) */ + ep(m_e2)->tx.tx_hook = uct_ud_ep_null_hook; + set_tx_win(m_e2, 128); + EXPECT_UCS_OK(tx(m_e2)); + short_progress_loop(); + ep(m_e1)->rx.rx_hook = uct_ud_ep_null_hook; + twait(500); + + EXPECT_TRUE(uct_ud_ep_is_connected(ep(m_e1))); + EXPECT_TRUE(uct_ud_ep_is_connected(ep(m_e2))); +} + + _UCT_INSTANTIATE_TEST_CASE(test_ud, ud) _UCT_INSTANTIATE_TEST_CASE(test_ud, ud_mlx5) From 4f12f8d05abcd0d0279ba9625a62e2bbf45ea9b2 Mon Sep 17 00:00:00 2001 From: Mikhail Brinskii Date: Thu, 8 Sep 2016 15:30:43 +0300 Subject: [PATCH 2/2] UCT/UD: Applied comments to CREQ assert issue fix --- src/uct/ib/ud/base/ud_ep.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index 2a5b93080a0..c6dacde753e 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -478,14 +478,19 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_RCVD); } -static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_ctl_hdr_t *ctl) +static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_neth_t *neth) { + uct_ud_ctl_hdr_t *ctl = (uct_ud_ctl_hdr_t*)(neth + 1); ucs_trace_func(""); ucs_assert_always(ctl->type == UCT_UD_PACKET_CREP); /* note that duplicate creps are discared earlier */ - ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID || + ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID || ep->dest_ep_id == ctl->conn_rep.src_ep_id); ep->dest_ep_id = ctl->conn_rep.src_ep_id; + + /* No need to track duplications, CREP always goes + * with ACK_REQ flag */ + ep->rx.ooo_pkts.head_sn = neth->psn; ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); uct_ud_peer_copy(&ep->peer, &ctl->peer); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREP_RCVD); @@ -579,10 +584,7 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b goto out; } if (neth->packet_type & UCT_UD_PACKET_FLAG_CTL) { - /* No need to track duplications, CREP always goes - * with ACK_REQ flag */ - ep->rx.ooo_pkts.head_sn = neth->psn; - uct_ud_ep_rx_ctl(iface, ep, (uct_ud_ctl_hdr_t *)(neth + 1)); + uct_ud_ep_rx_ctl(iface, ep, neth); goto out; } }