Skip to content

Commit

Permalink
UCT: fix hang when using polling fd openucx#1492
Browse files Browse the repository at this point in the history
  • Loading branch information
evgeny-leksikov committed Jun 2, 2017
1 parent 4962c52 commit 71474ca
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/uct/ib/base/ib_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ UCS_CLASS_INIT_FUNC(uct_ib_iface_t, uct_ib_iface_ops_t *ops, uct_md_h md,
goto err;
}

self->wakeup_events = 0;
self->comp_channel = ibv_create_comp_channel(dev->ibv_context);
if (self->comp_channel == NULL) {
ucs_error("ibv_create_comp_channel() failed: %m");
Expand Down Expand Up @@ -923,6 +924,7 @@ ucs_status_t uct_ib_iface_wakeup_open(uct_iface_h iface, unsigned events,
{
uct_ib_iface_t *ib_iface = ucs_derived_of(iface, uct_ib_iface_t);
wakeup->fd = ib_iface->comp_channel->fd;
ib_iface->wakeup_events = events;
return UCS_OK;
}

Expand All @@ -933,6 +935,8 @@ ucs_status_t uct_ib_iface_wakeup_signal(uct_wakeup_h wakeup)

void uct_ib_iface_wakeup_close(uct_wakeup_h wakeup)
{
uct_ib_iface_t *ib_iface = ucs_derived_of(wakeup->iface, uct_ib_iface_t);
ib_iface->wakeup_events = 0;
}

static ucs_status_t uct_ib_iface_arm_cq(uct_ib_iface_t *iface, struct ibv_cq *cq,
Expand Down
3 changes: 2 additions & 1 deletion src/uct/ib/base/ib_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ struct uct_ib_iface {

struct ibv_cq *send_cq;
struct ibv_cq *recv_cq;
struct ibv_comp_channel *comp_channel;
uct_recv_desc_t release_desc;
struct ibv_comp_channel *comp_channel;
unsigned wakeup_events;

uint8_t *path_bits;
unsigned path_bits_count;
Expand Down
3 changes: 3 additions & 0 deletions src/uct/ib/dc/accel/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface)
uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci);

iface->super.super.tx.cq_available++;
if (iface->super.super.super.wakeup_events & UCT_WAKEUP_TX_COMPLETION) {
iface->super.super.super.ops->arm_tx_cq(&iface->super.super.super);
}

if (uct_dc_iface_dci_can_alloc(&iface->super)) {
ucs_arbiter_dispatch(uct_dc_iface_dci_waitq(&iface->super), 1,
Expand Down
8 changes: 7 additions & 1 deletion src/uct/ib/dc/verbs/dc_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,13 @@ uct_dc_verbs_poll_tx(uct_dc_verbs_iface_t *iface)
uct_rc_txqp_completion_desc(&iface->super.tx.dcis[dci].txqp, iface->dcis_txcnt[dci].ci);
}

iface->super.super.tx.cq_available += num_wcs;
if (num_wcs) {
iface->super.super.tx.cq_available += num_wcs;
if (iface->super.super.super.wakeup_events & UCT_WAKEUP_TX_COMPLETION) {
iface->super.super.super.ops->arm_tx_cq(&iface->super.super.super);
}
}

if (uct_dc_iface_dci_can_alloc(&iface->super)) {
ucs_arbiter_dispatch(uct_dc_iface_dci_waitq(&iface->super), 1,
uct_dc_iface_dci_do_pending_wait, NULL);
Expand Down
4 changes: 4 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ uct_rc_mlx5_iface_common_poll_rx(uct_rc_mlx5_iface_common_t *mlx5_common_iface,
}

++rc_iface->rx.srq.available;
if (rc_iface->super.wakeup_events & UCT_WAKEUP_TX_COMPLETION) {
rc_iface->super.ops->arm_tx_cq(&rc_iface->super);
}

status = UCS_OK;

done:
Expand Down
4 changes: 4 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_t *iface)

uct_rc_mlx5_txqp_process_tx_cqe(&ep->super.txqp, cqe, hw_ci);

if (iface->super.super.wakeup_events & UCT_WAKEUP_TX_COMPLETION) {
iface->super.super.ops->arm_tx_cq(&iface->super.super);
}

ucs_arbiter_group_schedule(&iface->super.tx.arbiter, &ep->super.arb_group);
ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending, NULL);
}
Expand Down
4 changes: 4 additions & 0 deletions src/uct/ib/rc/verbs/rc_verbs_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ uct_rc_verbs_iface_poll_rx_common(uct_rc_iface_t *iface)
wc[i].byte_len, wc[i].imm_data, wc[i].slid);
}
iface->rx.srq.available += num_wcs;
if (iface->super.wakeup_events &
(UCT_WAKEUP_RX_AM | UCT_WAKEUP_RX_SIGNALED_AM)) {
iface->super.ops->arm_rx_cq(&iface->super, 0);
}
UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_RC_IFACE_STAT_RX_COMPLETION, num_wcs);

out:
Expand Down
9 changes: 8 additions & 1 deletion src/uct/ib/rc/verbs/rc_verbs_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,14 @@ uct_rc_verbs_iface_poll_tx(uct_rc_verbs_iface_t *iface)
uct_rc_txqp_completion_desc(&ep->super.txqp, ep->txcnt.ci);
ucs_arbiter_group_schedule(&iface->super.tx.arbiter, &ep->super.arb_group);
}
iface->super.tx.cq_available += num_wcs;

if (num_wcs) {
iface->super.tx.cq_available += num_wcs;
if (iface->super.super.wakeup_events & UCT_WAKEUP_TX_COMPLETION) {
iface->super.super.ops->arm_tx_cq(&iface->super.super);
}
}

ucs_arbiter_dispatch(&iface->super.tx.arbiter, 1, uct_rc_ep_process_pending, NULL);
}

Expand Down
5 changes: 5 additions & 0 deletions src/uct/ib/ud/verbs/ud_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ uct_ud_verbs_iface_poll_rx(uct_ud_verbs_iface_t *iface, int is_async)

}
iface->super.rx.available += num_wcs;
if (iface->super.super.wakeup_events &
(UCT_WAKEUP_RX_AM | UCT_WAKEUP_RX_SIGNALED_AM)) {
iface->super.super.ops->arm_rx_cq(&iface->super.super, 0);
}

out:
uct_ud_verbs_iface_post_recv(iface);
return status;
Expand Down

0 comments on commit 71474ca

Please sign in to comment.