Skip to content

Commit

Permalink
Merge pull request #7854 from yosefe/topic/ucp-ep-release-ep-removed-…
Browse files Browse the repository at this point in the history
…request-if

UCT/UD: Fix completion callback not called for flush operation
  • Loading branch information
yosefe authored Jan 21, 2022
2 parents e0b0559 + 70b18d5 commit ea16f70
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
8 changes: 8 additions & 0 deletions src/uct/ib/ud/base/ud_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ static void uct_ud_ep_purge_outstanding(uct_ud_ep_t *ep)
uct_ud_ctl_desc_t *cdesc;
ucs_queue_iter_t iter;

ucs_trace_func("ep=%p", ep);

ucs_queue_for_each_safe(cdesc, iter, &iface->tx.outstanding_q, queue) {
if (cdesc->ep == ep) {
ucs_queue_del_iter(&iface->tx.outstanding_q, iter);
Expand All @@ -230,6 +232,8 @@ static void uct_ud_ep_purge(uct_ud_ep_t *ep, ucs_status_t status)
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_ud_iface_t);

ucs_trace_func("ep=%p", ep);

uct_ud_iface_dispatch_async_comps(iface, ep);

/* reset the maximal TX psn value to the default, since we should be able
Expand Down Expand Up @@ -1111,6 +1115,8 @@ ucs_status_t uct_ud_ep_flush(uct_ep_h ep_h, unsigned flags,
uct_ud_iface_t);
ucs_status_t status;

ucs_trace_func("ep=%p", ep);

uct_ud_enter(iface);

if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
Expand Down Expand Up @@ -1674,6 +1680,8 @@ void uct_ud_ep_pending_purge(uct_ep_h ep_h, uct_pending_purge_callback_t cb,
uct_ud_iface_t);
uct_purge_cb_args_t args = {cb, arg};

ucs_trace_func("ep=%p", ep);

uct_ud_enter(iface);
ucs_arbiter_group_purge(&iface->tx.pending_q, &ep->tx.pending.group,
uct_ud_ep_pending_purge_cb, &args);
Expand Down
12 changes: 8 additions & 4 deletions src/uct/ib/ud/base/ud_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -852,19 +852,23 @@ unsigned
uct_ud_iface_dispatch_async_comps_do(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
{
unsigned count = 0;
uct_ud_send_skb_t *skb;
uct_ud_comp_desc_t *cdesc;
uct_ud_send_skb_t *skb;
ucs_queue_iter_t iter;

ucs_queue_for_each_extract(skb, &iface->tx.async_comp_q, queue, 1) {
ucs_trace_func("ep=%p", ep);

ucs_queue_for_each_safe(skb, iter, &iface->tx.async_comp_q, queue) {
ucs_assert(!(skb->flags & UCT_UD_SEND_SKB_FLAG_RESENDING));
cdesc = uct_ud_comp_desc(skb);
ucs_assert(cdesc->ep != NULL);

if ((ep == NULL) || (ep == cdesc->ep)) {
ucs_trace("ep %p: dispatch async comp %p", ep, cdesc->comp);
ucs_queue_del_iter(&iface->tx.async_comp_q, iter);
uct_ud_iface_dispatch_comp(iface, cdesc->comp);
uct_ud_skb_release(skb, 0);
++count;
}
++count;
}

return count;
Expand Down

0 comments on commit ea16f70

Please sign in to comment.