diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index 473005ab7e5..acfcc2ef9bb 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -186,6 +186,19 @@ static void uct_ud_ep_slow_timer(ucs_wtimer_t *self) ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer, ep->tx.slow_tick); } +#if HAVE_HNS_ROCE +static void uct_ud_ep_pskb_free(uct_ud_ep_t *ep) +{ + uct_ud_send_skb_t *skb; + + ucs_queue_for_each_extract(skb, &ep->pending_skb, queue, 1) { + ucs_mpool_put(skb); + } +} +#else +#define uct_ud_ep_pskb_free(ep) +#endif + UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface) { ucs_trace_func(""); @@ -199,6 +212,9 @@ UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface) uct_ud_iface_add_ep(iface, self); self->tx.slow_tick = iface->async.slow_tick; ucs_wtimer_init(&self->slow_timer, uct_ud_ep_slow_timer); +#if HAVE_HNS_ROCE + ucs_queue_head_init(&self->pending_skb); +#endif ucs_arbiter_group_init(&self->tx.pending.group); ucs_arbiter_elem_init(&self->tx.pending.elem); @@ -238,6 +254,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_ud_ep_t) ucs_trace_func("ep=%p id=%d conn_id=%d", self, self->ep_id, self->conn_id); + uct_ud_ep_pskb_free(self); ucs_wtimer_remove(&self->slow_timer); uct_ud_iface_remove_ep(iface, self); uct_ud_iface_cep_remove(self); @@ -426,7 +443,7 @@ uct_ud_iface_add_async_comp(uct_ud_iface_t *iface, uct_ud_ep_t *ep, static UCS_F_ALWAYS_INLINE void uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep, - uct_ud_psn_t ack_psn, int is_async) + uct_ud_psn_t ack_psn, int is_async, int dummy_ack) { uct_ud_send_skb_t *skb; if (ucs_unlikely(UCT_UD_PSN_COMPARE(ack_psn, <=, ep->tx.acked_psn))) { @@ -448,7 +465,12 @@ uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep, } skb->flags = 0; /* reset also ACK_REQ flag */ - ucs_mpool_put(skb); +#if HAVE_HNS_ROCE + if (dummy_ack) + ucs_queue_push(&ep->pending_skb, &skb->queue); + else +#endif + ucs_mpool_put(skb); } uct_ud_ep_ca_ack(ep); @@ -529,7 +551,7 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) /* our own creq was sent, treat incoming creq as ack and remove our own * from tx window */ - uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0); + uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0, 1); } uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP); } @@ -549,6 +571,7 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) /* scedule connection reply op */ UCT_UD_EP_HOOK_CALL_RX(ep, neth, sizeof(*neth) + sizeof(*ctl)); if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) { + uct_ud_ep_pskb_free(ep); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_NOTSENT); } uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ); @@ -570,6 +593,8 @@ static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, return; } + uct_ud_ep_pskb_free(ep); + ep->rx.ooo_pkts.head_sn = neth->psn; ep->dest_ep_id = ctl->conn_rep.src_ep_id; ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); @@ -663,7 +688,7 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b ucs_assert(ep->ep_id != UCT_UD_EP_NULL_ID); UCT_UD_EP_HOOK_CALL_RX(ep, neth, byte_len); - uct_ud_ep_process_ack(iface, ep, neth->ack_psn, is_async); + uct_ud_ep_process_ack(iface, ep, neth->ack_psn, is_async, 0); if (ucs_unlikely(neth->packet_type & UCT_UD_PACKET_FLAG_ACK_REQ)) { uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK); diff --git a/src/uct/ib/ud/base/ud_ep.h b/src/uct/ib/ud/base/ud_ep.h index 7100347b9a3..206e8100486 100644 --- a/src/uct/ib/ud/base/ud_ep.h +++ b/src/uct/ib/ud/base/ud_ep.h @@ -251,6 +251,9 @@ struct uct_ud_ep { uint8_t rx_creq_count; /* TODO: remove when reason for DUP/OOO CREQ is found */ ucs_wtimer_t slow_timer; ucs_time_t close_time; /* timestamp of closure */ +#if HAVE_HNS_ROCE + ucs_queue_head_t pending_skb; +#endif UCS_STATS_NODE_DECLARE(stats); UCT_UD_EP_HOOK_DECLARE(timer_hook); #if ENABLE_DEBUG_DATA