From 477611ba96b1840c4a5a296672f2d084adbb19fb Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Tue, 1 Feb 2022 20:17:36 +0200 Subject: [PATCH] UCP/CORE: Drain keeplive timer fd to not trigger unnecessary events on user's fd --- src/ucp/core/ucp_worker.c | 84 ++++++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 348ea090a0b1..c3957bbe7d74 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -2681,32 +2681,26 @@ ucs_status_t ucp_worker_get_efd(ucp_worker_h worker, int *fd) return status; } -ucs_status_t ucp_worker_arm(ucp_worker_h worker) +static ucs_status_t +ucp_worker_fd_read(ucp_worker_h worker, int fd, const char *fd_name) { - ucp_worker_iface_t *wiface; ucs_status_t status; uint64_t dummy; int ret; - ucs_trace_func("worker=%p", worker); - - UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP, - return UCS_ERR_INVALID_PARAM); - - /* Read from event pipe. If some events are found, return BUSY, - * Otherwise, continue to arm the transport interfaces. - */ do { - ret = read(worker->eventfd, &dummy, sizeof(dummy)); + ret = read(fd, &dummy, sizeof(dummy)); if (ret == sizeof(dummy)) { - ucs_trace("worker %p: extracted queued event", worker); + ucs_trace_poll("worker %p: extracted queued event for fd=%d (%s)", + worker, fd, fd_name); status = UCS_ERR_BUSY; goto out; } else if (ret == -1) { if (errno == EAGAIN) { break; /* No more events */ } else if (errno != EINTR) { - ucs_error("Read from internal event fd failed: %m"); + ucs_error("worker %p: read from fd=%d (%s) failed: %m", + worker, fd, fd_name); status = UCS_ERR_IO_ERROR; goto out; } @@ -2715,25 +2709,64 @@ ucs_status_t ucp_worker_arm(ucp_worker_h worker) } } while (ret != 0); + status = UCS_OK; + +out: + return status; +} + +ucs_status_t ucp_worker_arm(ucp_worker_h worker) +{ + ucp_worker_iface_t *wiface; + ucs_status_t status; + + ucs_trace_func("worker=%p", worker); + + UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP, + return UCS_ERR_INVALID_PARAM); + + /* Read from event pipe. If some events are found, return BUSY, otherwise - + * continue to arm the transport interfaces. + */ + status = ucp_worker_fd_read(worker, worker->eventfd, "internal event fd"); + if (status != UCS_OK) { + return status; + } + + if (worker->keepalive.timerfd >= 0) { + /* Do read() of 8-byte unsigned integer containing the number of + * expirations that have occured to make sure no events will be + * triggered again until timer isn't expired again. + */ + status = ucp_worker_fd_read(worker, worker->keepalive.timerfd, + "keepalive fd"); + if (status != UCS_OK) { + return status; + } + + /* Make sure not missing keepalive rounds after a long time without + * calling UCP worker progress. + */ + UCS_STATIC_ASSERT(ucs_is_pow2_or_zero(UCP_WORKER_KEEPALIVE_ITER_SKIP)); + worker->keepalive.iter_count = + ucs_align_up_pow2(worker->keepalive.iter_count, + UCP_WORKER_KEEPALIVE_ITER_SKIP); + } + UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); /* Go over arm_list of active interfaces which support events and arm them */ ucs_list_for_each(wiface, &worker->arm_ifaces, arm_list) { ucs_assert(wiface->activate_count > 0); status = uct_iface_event_arm(wiface->iface, worker->uct_events); - ucs_trace("arm iface %p returned %s", wiface->iface, - ucs_status_string(status)); + ucs_trace_data("arm iface %p returned %s", wiface->iface, + ucs_status_string(status)); if (status != UCS_OK) { - goto out_unlock; + break; } } - status = UCS_OK; - -out_unlock: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); -out: - ucs_trace("ucp_worker_arm returning %s", ucs_status_string(status)); return status; } @@ -2943,13 +2976,12 @@ ucp_worker_keepalive_timerfd_init(ucp_worker_h worker) struct timespec ts; int ret; - if (!(worker->context->config.features & UCP_FEATURE_WAKEUP) || (worker->keepalive.timerfd >= 0)) { return; } - worker->keepalive.timerfd = timerfd_create(CLOCK_MONOTONIC, 0); + worker->keepalive.timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if (worker->keepalive.timerfd < 0) { ucs_warn("worker %p: failed to create keepalive timer fd: %m", worker); @@ -2967,10 +2999,16 @@ ucp_worker_keepalive_timerfd_init(ucp_worker_h worker) "(fd=%d interval=%lu.%06lu) failed: %m", worker, worker->keepalive.timerfd, ts.tv_sec, ts.tv_nsec * UCS_NSEC_PER_USEC); + goto err_close_timerfd; } ucp_worker_wakeup_ctl_fd(worker, UCP_WORKER_EPFD_OP_ADD, worker->keepalive.timerfd); + + return; + +err_close_timerfd: + close(worker->keepalive.timerfd); } static UCS_F_ALWAYS_INLINE void