Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Fix hang in MPI_Finalize with UCX_TLS=rc[_x],sm
  • Loading branch information
evgeny-leksikov committed May 23, 2017
1 parent 6f9d5bc commit d2a722a
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 23 deletions.
3 changes: 3 additions & 0 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ void uct_set_ep_failed(ucs_class_t *cls, uct_ep_h tl_ep, uct_iface_h tl_iface)
if (iface->err_handler) {
iface->err_handler(iface->err_handler_arg, tl_ep,
UCS_ERR_ENDPOINT_TIMEOUT);
} else {
ucs_error("Error %s was not handled for ep %p",
ucs_status_string(UCS_ERR_ENDPOINT_TIMEOUT), tl_ep);
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/uct/ib/ud/base/ud_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,25 @@ static void uct_ud_ep_slow_timer(ucs_wtimer_t *self)
if (diff > iface->config.peer_timeout) {
iface->super.ops->handle_failure(&iface->super, ep);
return;
} else if (diff > 3*uct_ud_slow_tick()) {
ucs_trace("sceduling resend now: %lu send_time: %lu diff: %lu tick: %lu",
now, ep->tx.send_time, now - ep->tx.send_time, uct_ud_slow_tick());
} else if (diff > 3*iface->async.slow_tick) {
ucs_trace("scheduling resend now: %lu send_time: %lu diff: %lu tick: %lu",
now, ep->tx.send_time, now - ep->tx.send_time,
ep->tx.slow_tick);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ);
uct_ud_ep_ca_drop(ep);
uct_ud_ep_resend_start(iface, ep);
} else if (diff > uct_ud_slow_tick() && uct_ud_ep_is_connected(ep)) {
} else if ((diff > iface->async.slow_tick) && uct_ud_ep_is_connected(ep)) {
/* It is possible that the sender is slow.
* Try to flush the window twice before going into
* full resend mode.
*/
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK_REQ);
}

ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer,
uct_ud_slow_tick());
/* Cool down the timer on rescheduling/resending */
ep->tx.slow_tick *= iface->config.slow_timer_backoff;
ep->tx.slow_tick = ucs_min(ep->tx.slow_tick, iface->config.peer_timeout/3);
ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer, ep->tx.slow_tick);
}

UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface)
Expand All @@ -153,6 +156,7 @@ UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface)
uct_ud_ep_reset(self);
ucs_list_head_init(&self->cep_list);
uct_ud_iface_add_ep(iface, self);
self->tx.slow_tick = iface->async.slow_tick;
ucs_wtimer_init(&self->slow_timer, uct_ud_ep_slow_timer);
ucs_arbiter_group_init(&self->tx.pending.group);
ucs_arbiter_elem_init(&self->tx.pending.elem);
Expand Down Expand Up @@ -402,6 +406,7 @@ uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep,

ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);

ep->tx.slow_tick = iface->async.slow_tick;
ep->tx.send_time = uct_ud_iface_get_async_time(iface);
}

Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/ud/base/ud_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ struct uct_ud_ep {
ucs_queue_head_t window; /* send window: [acked_psn+1, psn-1] */
uct_ud_ep_pending_op_t pending; /* pending ops */
ucs_time_t send_time; /* tx time of last packet */
ucs_time_t slow_tick; /* timeout to trigger slow timer */
UCS_STATS_NODE_DECLARE(stats);
UCT_UD_EP_HOOK_DECLARE(tx_hook);
} tx;
Expand Down
19 changes: 16 additions & 3 deletions src/uct/ib/ud/base/ud_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,16 @@ ucs_status_t uct_ud_iface_complete_init(uct_ud_iface_t *iface)

uct_ud_iface_reserve_skbs(iface, iface->tx.available);

status = ucs_twheel_init(&iface->async.slow_timer, uct_ud_slow_tick() / 4,
/* TODO: make tick configurable */
iface->async.slow_tick = ucs_time_from_msec(100);
status = ucs_twheel_init(&iface->async.slow_timer,
iface->async.slow_tick / 4,
uct_ud_iface_get_async_time(iface));
if (status != UCS_OK) {
goto err;
}

/* TODO: make tick configurable */
status = ucs_async_add_timer(async_mode, uct_ud_slow_tick(),
status = ucs_async_add_timer(async_mode, iface->async.slow_tick,
uct_ud_iface_timer, iface, async,
&iface->async.timer_id);
if (status != UCS_OK) {
Expand Down Expand Up @@ -433,6 +435,14 @@ UCS_CLASS_INIT_FUNC(uct_ud_iface_t, uct_ud_iface_ops_t *ops, uct_md_h md,
self->config.tx_qp_len = config->super.tx.queue_len;
self->config.peer_timeout = ucs_time_from_sec(config->peer_timeout);

if (config->slow_timer_backoff <= 0.) {
ucs_error("The slow timer back off should be > 0 (%lf)",
config->slow_timer_backoff);
return UCS_ERR_INVALID_PARAM;
} else {
self->config.slow_timer_backoff = config->slow_timer_backoff;
}

/* Redefine receive desc release callback */
self->super.release_desc.cb = uct_ud_iface_release_desc;

Expand Down Expand Up @@ -514,6 +524,9 @@ ucs_config_field_t uct_ud_iface_config_table[] = {
ucs_offsetof(uct_ud_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)},
{"TIMEOUT", "5.0m", "Transport timeout",
ucs_offsetof(uct_ud_iface_config_t, peer_timeout), UCS_CONFIG_TYPE_TIME},
{"SLOW_TIMER_BACKOFF", "2.0", "Timeout multiplier for resending trigger",
ucs_offsetof(uct_ud_iface_config_t, slow_timer_backoff),
UCS_CONFIG_TYPE_DOUBLE},
{NULL}
};

Expand Down
3 changes: 3 additions & 0 deletions src/uct/ib/ud/base/ud_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
typedef struct uct_ud_iface_config {
uct_ib_iface_config_t super;
double peer_timeout;
double slow_timer_backoff;
} uct_ud_iface_config_t;

struct uct_ud_iface_peer {
Expand Down Expand Up @@ -119,13 +120,15 @@ struct uct_ud_iface {
} tx;
struct {
ucs_time_t peer_timeout;
double slow_timer_backoff;
unsigned tx_qp_len;
unsigned max_inline;
} config;
ucs_ptr_array_t eps;
uct_ud_iface_peer_t *peers[UCT_UD_HASH_SIZE];
struct {
ucs_twheel_t slow_timer;
ucs_time_t slow_tick;
int timer_id;
} async;
};
Expand Down
17 changes: 4 additions & 13 deletions src/uct/ib/ud/base/ud_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,6 @@ uct_ud_ep_get_tx_skb(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
return uct_ud_iface_get_tx_skb(iface, ep);
}

static inline ucs_time_t uct_ud_slow_tick()
{
return ucs_time_from_msec(100);
}

static inline ucs_time_t uct_ud_fast_tick()
{
return ucs_time_from_usec(1024);
}

static UCS_F_ALWAYS_INLINE void
uct_ud_am_set_zcopy_desc(uct_ud_send_skb_t *skb, const uct_iov_t *iov, size_t iovcnt,
uct_completion_t *comp)
Expand Down Expand Up @@ -130,10 +120,11 @@ uct_ud_iface_complete_tx_inl(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
skb->len += length;
memcpy(data, buffer, length);
ucs_queue_push(&ep->tx.window, &skb->queue);
ep->tx.slow_tick = iface->async.slow_tick;
ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer,
uct_ud_iface_get_async_time(iface) -
ucs_twheel_get_time(&iface->async.slow_timer) +
uct_ud_slow_tick());
ep->tx.slow_tick);
ep->tx.send_time = uct_ud_iface_get_async_time(iface);
}

Expand All @@ -144,10 +135,11 @@ uct_ud_iface_complete_tx_skb(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
iface->tx.skb = ucs_mpool_get(&iface->tx.mp);
ep->tx.psn++;
ucs_queue_push(&ep->tx.window, &skb->queue);
ep->tx.slow_tick = iface->async.slow_tick;
ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer,
uct_ud_iface_get_async_time(iface) -
ucs_twheel_get_time(&iface->async.slow_timer) +
uct_ud_slow_tick());
ep->tx.slow_tick);
ep->tx.send_time = uct_ud_iface_get_async_time(iface);
}

Expand Down Expand Up @@ -187,4 +179,3 @@ uct_ud_skb_bcopy(uct_ud_send_skb_t *skb, uct_pack_callback_t pack_cb, void *arg)
skb->len = sizeof(skb->neth[0]) + payload_len;
return payload_len;
}

2 changes: 1 addition & 1 deletion test/gtest/uct/test_ud_slow_timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ UCS_TEST_P(test_ud_slow_timer, retransmitn) {
ep(m_e2)->rx.rx_hook = uct_ud_ep_null_hook;
EXPECT_EQ(N+1, ep(m_e1)->tx.psn);
EXPECT_EQ(0, ucs_frag_list_sn(&ep(m_e2)->rx.ooo_pkts));
twait(500);
twait(500*ucs::test_time_multiplier());
//short_progress_loop();

EXPECT_EQ(N+1, ep(m_e1)->tx.psn);
Expand Down

0 comments on commit d2a722a

Please sign in to comment.