Skip to content

Commit

Permalink
Merge pull request #7887 from dmitrygx/topic/ucp/keepalive_wakeup_1_12
Browse files Browse the repository at this point in the history
UCP/CORE: Drain keeplive timer fd to not trigger unnecessary events on user's fd [v1.12.x]
  • Loading branch information
yosefe authored Feb 2, 2022
2 parents bd4170f + 7fdb958 commit 11bf7e9
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 23 deletions.
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Bugfixes
* Fixed memory hooks for Cuda 11.5
* Fixed memory type cache merge
* Fixed continuously triggering wakeup fd when keepalive is used

## 1.12.0 (January 12, 2022)
### Features:
Expand Down
84 changes: 61 additions & 23 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2681,32 +2681,26 @@ ucs_status_t ucp_worker_get_efd(ucp_worker_h worker, int *fd)
return status;
}

ucs_status_t ucp_worker_arm(ucp_worker_h worker)
static ucs_status_t
ucp_worker_fd_read(ucp_worker_h worker, int fd, const char *fd_name)
{
ucp_worker_iface_t *wiface;
ucs_status_t status;
uint64_t dummy;
int ret;

ucs_trace_func("worker=%p", worker);

UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP,
return UCS_ERR_INVALID_PARAM);

/* Read from event pipe. If some events are found, return BUSY,
* Otherwise, continue to arm the transport interfaces.
*/
do {
ret = read(worker->eventfd, &dummy, sizeof(dummy));
ret = read(fd, &dummy, sizeof(dummy));
if (ret == sizeof(dummy)) {
ucs_trace("worker %p: extracted queued event", worker);
ucs_trace_poll("worker %p: extracted queued event for fd=%d (%s)",
worker, fd, fd_name);
status = UCS_ERR_BUSY;
goto out;
} else if (ret == -1) {
if (errno == EAGAIN) {
break; /* No more events */
} else if (errno != EINTR) {
ucs_error("Read from internal event fd failed: %m");
ucs_error("worker %p: read from fd=%d (%s) failed: %m",
worker, fd, fd_name);
status = UCS_ERR_IO_ERROR;
goto out;
}
Expand All @@ -2715,25 +2709,64 @@ ucs_status_t ucp_worker_arm(ucp_worker_h worker)
}
} while (ret != 0);

status = UCS_OK;

out:
return status;
}

ucs_status_t ucp_worker_arm(ucp_worker_h worker)
{
ucp_worker_iface_t *wiface;
ucs_status_t status;

ucs_trace_func("worker=%p", worker);

UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP,
return UCS_ERR_INVALID_PARAM);

/* Read from event pipe. If some events are found, return BUSY, otherwise -
* continue to arm the transport interfaces.
*/
status = ucp_worker_fd_read(worker, worker->eventfd, "internal event fd");
if (status != UCS_OK) {
return status;
}

if (worker->keepalive.timerfd >= 0) {
/* Do read() of 8-byte unsigned integer containing the number of
* expirations that have occured to make sure no events will be
* triggered again until timer isn't expired again.
*/
status = ucp_worker_fd_read(worker, worker->keepalive.timerfd,
"keepalive fd");
if (status != UCS_OK) {
return status;
}

/* Make sure not missing keepalive rounds after a long time without
* calling UCP worker progress.
*/
UCS_STATIC_ASSERT(ucs_is_pow2_or_zero(UCP_WORKER_KEEPALIVE_ITER_SKIP));
worker->keepalive.iter_count =
ucs_align_up_pow2(worker->keepalive.iter_count,
UCP_WORKER_KEEPALIVE_ITER_SKIP);
}

UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

/* Go over arm_list of active interfaces which support events and arm them */
ucs_list_for_each(wiface, &worker->arm_ifaces, arm_list) {
ucs_assert(wiface->activate_count > 0);
status = uct_iface_event_arm(wiface->iface, worker->uct_events);
ucs_trace("arm iface %p returned %s", wiface->iface,
ucs_status_string(status));
ucs_trace_data("arm iface %p returned %s", wiface->iface,
ucs_status_string(status));
if (status != UCS_OK) {
goto out_unlock;
break;
}
}

status = UCS_OK;

out_unlock:
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
out:
ucs_trace("ucp_worker_arm returning %s", ucs_status_string(status));
return status;
}

Expand Down Expand Up @@ -2943,13 +2976,12 @@ ucp_worker_keepalive_timerfd_init(ucp_worker_h worker)
struct timespec ts;
int ret;


if (!(worker->context->config.features & UCP_FEATURE_WAKEUP) ||
(worker->keepalive.timerfd >= 0)) {
return;
}

worker->keepalive.timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
worker->keepalive.timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (worker->keepalive.timerfd < 0) {
ucs_warn("worker %p: failed to create keepalive timer fd: %m",
worker);
Expand All @@ -2967,10 +2999,16 @@ ucp_worker_keepalive_timerfd_init(ucp_worker_h worker)
"(fd=%d interval=%lu.%06lu) failed: %m", worker,
worker->keepalive.timerfd, ts.tv_sec,
ts.tv_nsec * UCS_NSEC_PER_USEC);
goto err_close_timerfd;
}

ucp_worker_wakeup_ctl_fd(worker, UCP_WORKER_EPFD_OP_ADD,
worker->keepalive.timerfd);

return;

err_close_timerfd:
close(worker->keepalive.timerfd);
}

static UCS_F_ALWAYS_INLINE void
Expand Down

0 comments on commit 11bf7e9

Please sign in to comment.