Skip to content

Commit

Permalink
Merge pull request #953 from brminich/topic/uct_ud_fix_conns
Browse files Browse the repository at this point in the history
UCT/UD: Fix issue N544 (CREQ assert)
  • Loading branch information
yosefe authored Sep 11, 2016
2 parents ea19a98 + 4f12f8d commit eda2c58
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 53 deletions.
63 changes: 42 additions & 21 deletions src/uct/ib/ud/base/ud_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "ud_ep.h"
#include "ud_iface.h"
#include "ud_inl.h"
#include "ud_def.h"

#include <uct/ib/base/ib_verbs.h>
#include <ucs/debug/memtrack.h>
Expand Down Expand Up @@ -359,13 +360,12 @@ uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
{
uct_ud_comp_desc_t *cdesc;
uct_ud_send_skb_t *skb;

if (ucs_unlikely(UCT_UD_PSN_COMPARE(ack_psn, <=, ep->tx.acked_psn))) {
return;
}

ep->tx.acked_psn = ack_psn;

/* Release acknowledged skb's */
ucs_queue_for_each_extract(skb, &ep->tx.window, queue,
UCT_UD_PSN_COMPARE(skb->neth->psn, <=, ack_psn)) {
Expand Down Expand Up @@ -446,6 +446,7 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth)
ucs_assert_always(ep != NULL);
ep->rx.ooo_pkts.head_sn = neth->psn;
uct_ud_peer_copy(&ep->peer, &ctl->peer);
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_PRIVATE);
} else {
if (ep->dest_ep_id == UCT_UD_EP_NULL_ID) {
Expand All @@ -463,6 +464,7 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth)
*/
uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0);
}
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
}
}

Expand All @@ -472,19 +474,23 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth)
ucs_assert_always(ep->rx.ooo_pkts.head_sn == neth->psn);
/* scedule connection reply op */
UCT_UD_EP_HOOK_CALL_RX(ep, neth, sizeof(*neth) + sizeof(*ctl));
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ);
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_RCVD);
}

static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_ctl_hdr_t *ctl)
static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_neth_t *neth)
{
uct_ud_ctl_hdr_t *ctl = (uct_ud_ctl_hdr_t*)(neth + 1);
ucs_trace_func("");
ucs_assert_always(ctl->type == UCT_UD_PACKET_CREP);
/* note that duplicate creps are discared earlier */
ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID ||
ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID ||
ep->dest_ep_id == ctl->conn_rep.src_ep_id);
ep->dest_ep_id = ctl->conn_rep.src_ep_id;

/* No need to track duplications, CREP always goes
* with ACK_REQ flag */
ep->rx.ooo_pkts.head_sn = neth->psn;
ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);
uct_ud_peer_copy(&ep->peer, &ctl->peer);
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREP_RCVD);
Expand Down Expand Up @@ -578,7 +584,7 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b
goto out;
}
if (neth->packet_type & UCT_UD_PACKET_FLAG_CTL) {
uct_ud_ep_rx_ctl(iface, ep, (uct_ud_ctl_hdr_t *)(neth + 1));
uct_ud_ep_rx_ctl(iface, ep, neth);
goto out;
}
}
Expand Down Expand Up @@ -740,8 +746,10 @@ static uct_ud_send_skb_t *uct_ud_ep_prepare_crep(uct_ud_ep_t *ep)
ucs_assert_always(ep->dest_ep_id != UCT_UD_EP_NULL_ID);
ucs_assert_always(ep->ep_id != UCT_UD_EP_NULL_ID);

skb = uct_ud_iface_res_skb_get(iface);
ucs_assert_always(skb != NULL);
skb = uct_ud_iface_get_tx_skb(iface, ep);
if (!skb) {
return NULL;
}

neth = skb->neth;
uct_ud_neth_init_data(ep, neth);
Expand Down Expand Up @@ -784,9 +792,9 @@ static uct_ud_send_skb_t *uct_ud_ep_resend(uct_ud_ep_t *ep)
return NULL;
}

/* creq or crep must remove creq packet from window */
/* creq/crep must remove creq packet from window */
ucs_assertv_always(!(uct_ud_ep_is_connected(ep) &&
(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_CTL) &&
(uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) &&
!(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_AM)),
"ep(%p): CREQ resend on endpoint which is already connected", ep);

Expand Down Expand Up @@ -828,26 +836,32 @@ static uct_ud_send_skb_t *uct_ud_ep_resend(uct_ud_ep_t *ep)
static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface)
{
uct_ud_send_skb_t *skb;
int flag = 0;

if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) {
skb = uct_ud_ep_prepare_creq(ep);
if (skb) {
/* creq allocates real skb, it must be put on window like
* a regular packet to ensure a retransmission.
*/
ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb,
1);
uct_ud_iface_complete_tx_skb(iface, ep, skb);
flag = 1;
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ);
}
return;
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREP)) {
skb = uct_ud_ep_prepare_crep(ep);
if (skb) {
flag = 1;
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREP);
}
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_RESEND)) {
skb = uct_ud_ep_resend(ep);
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK)) {
skb = &iface->tx.skb_inl.super;
uct_ud_neth_ctl_ack(ep, skb->neth);
if (uct_ud_ep_is_connected(ep)) {
skb = &iface->tx.skb_inl.super;
uct_ud_neth_ctl_ack(ep, skb->neth);
} else {
/* Do not send ACKs if not connected yet. It may happen if
* CREQ and CREP from peer are lost. Need to wait for CREP
* resending by peer. */
skb = NULL;
}
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK);
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK_REQ)) {
skb = &iface->tx.skb_inl.super;
Expand All @@ -865,8 +879,15 @@ static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface)
}

VALGRIND_MAKE_MEM_DEFINED(skb, sizeof *skb);
ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, 0);
uct_ud_iface_res_skb_put(iface, skb);
ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, flag);
if (flag) {
/* creq and crep allocate real skb, it must be put on window like
* a regular packet to ensure a retransmission.
*/
uct_ud_iface_complete_tx_skb(iface, ep, skb);
} else {
uct_ud_iface_res_skb_put(iface, skb);
}
}

static inline ucs_arbiter_cb_result_t
Expand Down
Loading

0 comments on commit eda2c58

Please sign in to comment.