From b97efd4de9b34bff850c8548b9895972f3d141eb Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 10 Oct 2017 12:41:49 +0300 Subject: [PATCH 1/2] UCP: Move flush-related functions to a separate file. --- src/ucp/Makefile.am | 1 + src/ucp/core/ucp_ep.c | 246 +------------------------------- src/ucp/core/ucp_ep.h | 4 + src/ucp/rma/basic_rma.c | 50 ------- src/ucp/rma/flush.c | 306 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 312 insertions(+), 295 deletions(-) create mode 100644 src/ucp/rma/flush.c diff --git a/src/ucp/Makefile.am b/src/ucp/Makefile.am index a0533dadab9..3fc1aa595f5 100644 --- a/src/ucp/Makefile.am +++ b/src/ucp/Makefile.am @@ -64,6 +64,7 @@ libucp_la_SOURCES = \ dt/dt.c \ proto/proto_am.c \ rma/basic_rma.c \ + rma/flush.c \ tag/eager_rcv.c \ tag/eager_snd.c \ tag/probe.c \ diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index f378a95a212..32c13e71c3f 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -302,187 +302,6 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker, return status; } -static void ucp_ep_flush_error(ucp_request_t *req, ucs_status_t status) -{ - if (ucp_ep_config(req->send.ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER) { - ucs_error("error during flush: %s", ucs_status_string(status)); - } - - req->status = status; - --req->send.uct_comp.count; -} - -static void ucp_ep_flush_progress(ucp_request_t *req) -{ - ucp_ep_h ep = req->send.ep; - ucp_lane_index_t lane; - ucs_status_t status; - uct_ep_h uct_ep; - - ucs_trace("ep %p: progress flush req %p, lanes 0x%x count %d", ep, req, - req->send.flush.lanes, req->send.uct_comp.count); - - while (req->send.flush.lanes) { - - /* Search for next lane to start flush */ - lane = ucs_ffs64(req->send.flush.lanes); - uct_ep = ep->uct_eps[lane]; - if (uct_ep == NULL) { - req->send.flush.lanes &= ~UCS_BIT(lane); - --req->send.uct_comp.count; - continue; - } - - /* Start flush operation on UCT endpoint */ - if (req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) { - uct_ep_pending_purge(uct_ep, ucp_ep_err_pending_purge, - UCS_STATUS_PTR(UCS_ERR_CANCELED)); - } - status = uct_ep_flush(uct_ep, req->send.flush.uct_flags, - &req->send.uct_comp); - ucs_trace("flushing ep %p lane[%d]: %s", ep, lane, - ucs_status_string(status)); - if (status == UCS_OK) { - req->send.flush.lanes &= ~UCS_BIT(lane); - --req->send.uct_comp.count; - } else if (status == UCS_INPROGRESS) { - req->send.flush.lanes &= ~UCS_BIT(lane); - } else if (status == UCS_ERR_NO_RESOURCE) { - if (req->send.lane != UCP_NULL_LANE) { - ucs_trace("ep %p: not adding pending flush %p on lane %d, " - "because it's already pending on lane %d", - ep, req, lane, req->send.lane); - break; - } - - status = uct_ep_pending_add(uct_ep, &req->send.uct); - ucs_trace("adding pending flush on ep %p lane[%d]: %s", ep, lane, - ucs_status_string(status)); - if (status == UCS_OK) { - req->send.lane = lane; - req->send.flush.lanes &= ~UCS_BIT(lane); - } else if (status != UCS_ERR_BUSY) { - ucp_ep_flush_error(req, status); - break; - } - } else { - ucp_ep_flush_error(req, status); - break; - } - } -} - -static void ucp_ep_flush_slow_path_remove(ucp_request_t *req) -{ - ucp_ep_h ep = req->send.ep; - uct_worker_progress_unregister_safe(ep->worker->uct, - &req->send.flush.slow_cb_id); -} - -static unsigned ucp_ep_flushed_slow_path_callback(void *arg) -{ - ucp_request_t *req = arg; - ucp_ep_h ep = req->send.ep; - - ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); - - ucs_trace("flush req %p ep %p remove from uct_worker %p", req, ep, - ep->worker->uct); - ucp_ep_flush_slow_path_remove(req); - req->send.flush.flushed_cb(req); - - /* Complete send request from here, to avoid releasing the request while - * slow-path element is still pending */ - ucp_request_complete_send(req, req->status); - - return 0; -} - -static int ucp_flush_check_completion(ucp_request_t *req) -{ - ucp_ep_h ep = req->send.ep; - - /* Check if flushed all lanes */ - if (req->send.uct_comp.count != 0) { - return 0; - } - - ucs_trace("adding slow-path callback to destroy ep %p", ep); - ucp_ep_flush_slow_path_remove(req); - uct_worker_progress_register_safe(ep->worker->uct, - ucp_ep_flushed_slow_path_callback, req, 0, - &req->send.flush.slow_cb_id); - return 1; -} - -static unsigned ucp_ep_flush_resume_slow_path_callback(void *arg) -{ - ucp_request_t *req = arg; - - ucp_ep_flush_slow_path_remove(req); - ucp_ep_flush_progress(req); - ucp_flush_check_completion(req); - return 0; -} - -static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self) -{ - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); - ucp_lane_index_t lane = req->send.lane; - ucp_ep_h ep = req->send.ep; - ucs_status_t status; - int completed; - - ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); - - status = uct_ep_flush(ep->uct_eps[lane], req->send.flush.uct_flags, - &req->send.uct_comp); - ucs_trace("flushing ep %p lane[%d]: %s", ep, lane, - ucs_status_string(status)); - if (status == UCS_OK) { - --req->send.uct_comp.count; /* UCT endpoint is flushed */ - } - - /* since req->flush.pend.lane is still non-NULL, this function will not - * put anything on pending. - */ - ucp_ep_flush_progress(req); - completed = ucp_flush_check_completion(req); - - /* If the operation has not completed, add slow-path progress to resume */ - if (!completed && req->send.flush.lanes) { - ucs_trace("ep %p: adding slow-path callback to resume flush", ep); - uct_worker_progress_register_safe(ep->worker->uct, - ucp_ep_flush_resume_slow_path_callback, - req, 0, &req->send.flush.slow_cb_id); - } - - if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { - req->send.lane = UCP_NULL_LANE; - return UCS_OK; - } else if (status == UCS_ERR_NO_RESOURCE) { - return UCS_ERR_NO_RESOURCE; - } else { - ucp_ep_flush_error(req, status); - return UCS_OK; - } -} - -static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status) -{ - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp); - - ucs_trace("flush completion req=%p status=%d", req, status); - - ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); - - if (status == UCS_OK) { - req->status = status; - } - - ucp_ep_flush_progress(req); - ucp_flush_check_completion(req); -} void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg) { @@ -542,7 +361,7 @@ void ucp_ep_destroy_internal(ucp_ep_h ep, const char *message) ucs_free(ep); } -static void ucp_ep_disconnected(ucp_ep_h ep) +void ucp_ep_disconnected(ucp_ep_h ep) { ucp_recv_desc_t *rdesc; @@ -571,69 +390,6 @@ static void ucp_ep_disconnected(ucp_ep_h ep) ucp_ep_destroy_internal(ep, " from disconnect"); } -static void ucp_ep_flushed_callback(ucp_request_t *req) -{ - ucp_ep_disconnected(req->send.ep); -} - -static ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) -{ - ucs_status_t status; - ucp_request_t *req; - - ucs_debug("disconnect ep %p", ep); - - if (ep->flags & UCP_EP_FLAG_FAILED) { - ucp_ep_disconnected(ep); - return NULL; - } - - req = ucs_mpool_get(&ep->worker->req_mp); - if (req == NULL) { - return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); - } - - /* - * Flush operation can be queued on the pending queue of only one of the - * lanes (indicated by req->send.lane) and scheduled for completion on any - * number of lanes. req->send.uct_comp.count keeps track of how many lanes - * are not flushed yet, and when it reaches zero, it means all lanes are - * flushed. req->send.flush.lanes keeps track of which lanes we still have - * to start flush on. - * If a flush is completed from a pending/completion callback, we need to - * schedule slow-path callback to release the endpoint later, since a UCT - * endpoint cannot be released from pending/completion callback context. - */ - req->flags = 0; - req->status = UCS_OK; - req->send.ep = ep; - req->send.flush.flushed_cb = ucp_ep_flushed_callback; - req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep)); - req->send.flush.slow_cb_id = UCS_CALLBACKQ_ID_NULL; - req->send.flush.uct_flags = (mode == UCP_EP_CLOSE_MODE_FLUSH) ? - UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL; - - req->send.lane = UCP_NULL_LANE; - req->send.uct.func = ucp_ep_flush_progress_pending; - req->send.uct_comp.func = ucp_ep_flush_completion; - req->send.uct_comp.count = ucp_ep_num_lanes(ep); - - ucp_ep_flush_progress(req); - - if (req->send.uct_comp.count == 0) { - status = req->status; - ucs_trace_req("ep %p: releasing flush request %p, returning status %s", - ep, req, ucs_status_string(status)); - ucs_mpool_put(req); - ucp_ep_disconnected(ep); - return UCS_STATUS_PTR(status); - } - - ucs_trace_req("ep %p: return inprogress flush request %p (%p)", ep, req, - req + 1); - return req + 1; -} - ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode) { ucp_worker_h worker = ep->worker; diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index f6fd896ee1b..3e3b3b248eb 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -223,6 +223,10 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid, const char *peer_name, const char *message, ucp_ep_h *ep_p); +ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode); + +void ucp_ep_disconnected(ucp_ep_h ep); + ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid, const char *message, ucp_ep_h *ep_p); diff --git a/src/ucp/rma/basic_rma.c b/src/ucp/rma/basic_rma.c index 722bad0c31f..6d784ddb7c3 100644 --- a/src/ucp/rma/basic_rma.c +++ b/src/ucp/rma/basic_rma.c @@ -420,53 +420,3 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_fence, (worker), ucp_worker_h worker) return status; } -UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) -{ - unsigned rsc_index; - - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); - - while (worker->wireup_pend_count > 0) { - ucp_worker_progress(worker); - } - - /* TODO flush in parallel */ - for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { - if (worker->ifaces[rsc_index].iface == NULL) { - continue; - } - - while (uct_iface_flush(worker->ifaces[rsc_index].iface, 0, NULL) != UCS_OK) { - ucp_worker_progress(worker); - } - } - - UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); - - return UCS_OK; -} - -UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep) -{ - ucp_lane_index_t lane; - ucs_status_t status; - - UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); - - for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { - for (;;) { - status = uct_ep_flush(ep->uct_eps[lane], 0, NULL); - if (status == UCS_OK) { - break; - } else if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { - goto out; - } - ucp_worker_progress(ep->worker); - } - } - - status = UCS_OK; -out: - UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); - return status; -} diff --git a/src/ucp/rma/flush.c b/src/ucp/rma/flush.c new file mode 100644 index 00000000000..2ee7dff20a1 --- /dev/null +++ b/src/ucp/rma/flush.c @@ -0,0 +1,306 @@ +/** + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * + * See file LICENSE for terms. + */ + +#include +#include +#include + + +static void ucp_ep_flush_error(ucp_request_t *req, ucs_status_t status) +{ + if (ucp_ep_config(req->send.ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER) { + ucs_error("error during flush: %s", ucs_status_string(status)); + } + + req->status = status; + --req->send.uct_comp.count; +} + +static void ucp_ep_flush_progress(ucp_request_t *req) +{ + ucp_ep_h ep = req->send.ep; + ucp_lane_index_t lane; + ucs_status_t status; + uct_ep_h uct_ep; + + ucs_trace("ep %p: progress flush req %p, lanes 0x%x count %d", ep, req, + req->send.flush.lanes, req->send.uct_comp.count); + + while (req->send.flush.lanes) { + + /* Search for next lane to start flush */ + lane = ucs_ffs64(req->send.flush.lanes); + uct_ep = ep->uct_eps[lane]; + if (uct_ep == NULL) { + req->send.flush.lanes &= ~UCS_BIT(lane); + --req->send.uct_comp.count; + continue; + } + + /* Start flush operation on UCT endpoint */ + if (req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) { + uct_ep_pending_purge(uct_ep, ucp_ep_err_pending_purge, + UCS_STATUS_PTR(UCS_ERR_CANCELED)); + } + status = uct_ep_flush(uct_ep, req->send.flush.uct_flags, + &req->send.uct_comp); + ucs_trace("flushing ep %p lane[%d]: %s", ep, lane, + ucs_status_string(status)); + if (status == UCS_OK) { + req->send.flush.lanes &= ~UCS_BIT(lane); + --req->send.uct_comp.count; + } else if (status == UCS_INPROGRESS) { + req->send.flush.lanes &= ~UCS_BIT(lane); + } else if (status == UCS_ERR_NO_RESOURCE) { + if (req->send.lane != UCP_NULL_LANE) { + ucs_trace("ep %p: not adding pending flush %p on lane %d, " + "because it's already pending on lane %d", + ep, req, lane, req->send.lane); + break; + } + + status = uct_ep_pending_add(uct_ep, &req->send.uct); + ucs_trace("adding pending flush on ep %p lane[%d]: %s", ep, lane, + ucs_status_string(status)); + if (status == UCS_OK) { + req->send.lane = lane; + req->send.flush.lanes &= ~UCS_BIT(lane); + } else if (status != UCS_ERR_BUSY) { + ucp_ep_flush_error(req, status); + break; + } + } else { + ucp_ep_flush_error(req, status); + break; + } + } +} + +static void ucp_ep_flush_slow_path_remove(ucp_request_t *req) +{ + ucp_ep_h ep = req->send.ep; + uct_worker_progress_unregister_safe(ep->worker->uct, + &req->send.flush.slow_cb_id); +} + +static unsigned ucp_ep_flushed_slow_path_callback(void *arg) +{ + ucp_request_t *req = arg; + ucp_ep_h ep = req->send.ep; + + ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); + + ucs_trace("flush req %p ep %p remove from uct_worker %p", req, ep, + ep->worker->uct); + ucp_ep_flush_slow_path_remove(req); + req->send.flush.flushed_cb(req); + + /* Complete send request from here, to avoid releasing the request while + * slow-path element is still pending */ + ucp_request_complete_send(req, req->status); + + return 0; +} + +static int ucp_flush_check_completion(ucp_request_t *req) +{ + ucp_ep_h ep = req->send.ep; + + /* Check if flushed all lanes */ + if (req->send.uct_comp.count != 0) { + return 0; + } + + ucs_trace("adding slow-path callback to destroy ep %p", ep); + ucp_ep_flush_slow_path_remove(req); + uct_worker_progress_register_safe(ep->worker->uct, + ucp_ep_flushed_slow_path_callback, req, 0, + &req->send.flush.slow_cb_id); + return 1; +} + +static unsigned ucp_ep_flush_resume_slow_path_callback(void *arg) +{ + ucp_request_t *req = arg; + + ucp_ep_flush_slow_path_remove(req); + ucp_ep_flush_progress(req); + ucp_flush_check_completion(req); + return 0; +} + +static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self) +{ + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); + ucp_lane_index_t lane = req->send.lane; + ucp_ep_h ep = req->send.ep; + ucs_status_t status; + int completed; + + ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); + + status = uct_ep_flush(ep->uct_eps[lane], req->send.flush.uct_flags, + &req->send.uct_comp); + ucs_trace("flushing ep %p lane[%d]: %s", ep, lane, + ucs_status_string(status)); + if (status == UCS_OK) { + --req->send.uct_comp.count; /* UCT endpoint is flushed */ + } + + /* since req->flush.pend.lane is still non-NULL, this function will not + * put anything on pending. + */ + ucp_ep_flush_progress(req); + completed = ucp_flush_check_completion(req); + + /* If the operation has not completed, add slow-path progress to resume */ + if (!completed && req->send.flush.lanes) { + ucs_trace("ep %p: adding slow-path callback to resume flush", ep); + uct_worker_progress_register_safe(ep->worker->uct, + ucp_ep_flush_resume_slow_path_callback, + req, 0, &req->send.flush.slow_cb_id); + } + + if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { + req->send.lane = UCP_NULL_LANE; + return UCS_OK; + } else if (status == UCS_ERR_NO_RESOURCE) { + return UCS_ERR_NO_RESOURCE; + } else { + ucp_ep_flush_error(req, status); + return UCS_OK; + } +} + +static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status) +{ + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp); + + ucs_trace("flush completion req=%p status=%d", req, status); + + ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); + + if (status == UCS_OK) { + req->status = status; + } + + ucp_ep_flush_progress(req); + ucp_flush_check_completion(req); +} + +static void ucp_ep_flushed_callback(ucp_request_t *req) +{ + ucp_ep_disconnected(req->send.ep); +} + +ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) +{ + ucs_status_t status; + ucp_request_t *req; + + ucs_debug("disconnect ep %p", ep); + + if (ep->flags & UCP_EP_FLAG_FAILED) { + ucp_ep_disconnected(ep); + return NULL; + } + + req = ucs_mpool_get(&ep->worker->req_mp); + if (req == NULL) { + return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); + } + + /* + * Flush operation can be queued on the pending queue of only one of the + * lanes (indicated by req->send.lane) and scheduled for completion on any + * number of lanes. req->send.uct_comp.count keeps track of how many lanes + * are not flushed yet, and when it reaches zero, it means all lanes are + * flushed. req->send.flush.lanes keeps track of which lanes we still have + * to start flush on. + * If a flush is completed from a pending/completion callback, we need to + * schedule slow-path callback to release the endpoint later, since a UCT + * endpoint cannot be released from pending/completion callback context. + */ + req->flags = 0; + req->status = UCS_OK; + req->send.ep = ep; + req->send.flush.flushed_cb = ucp_ep_flushed_callback; + req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep)); + req->send.flush.slow_cb_id = UCS_CALLBACKQ_ID_NULL; + req->send.flush.uct_flags = (mode == UCP_EP_CLOSE_MODE_FLUSH) ? + UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL; + + req->send.lane = UCP_NULL_LANE; + req->send.uct.func = ucp_ep_flush_progress_pending; + req->send.uct_comp.func = ucp_ep_flush_completion; + req->send.uct_comp.count = ucp_ep_num_lanes(ep); + + ucp_ep_flush_progress(req); + + if (req->send.uct_comp.count == 0) { + status = req->status; + ucs_trace_req("ep %p: releasing flush request %p, returning status %s", + ep, req, ucs_status_string(status)); + ucs_mpool_put(req); + ucp_ep_disconnected(ep); + return UCS_STATUS_PTR(status); + } + + ucs_trace_req("ep %p: return inprogress flush request %p (%p)", ep, req, + req + 1); + return req + 1; +} + +UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) +{ + unsigned rsc_index; + + UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); + + while (worker->wireup_pend_count > 0) { + ucp_worker_progress(worker); + } + + /* TODO flush in parallel */ + for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { + if (worker->ifaces[rsc_index].iface == NULL) { + continue; + } + + while (uct_iface_flush(worker->ifaces[rsc_index].iface, 0, NULL) != UCS_OK) { + ucp_worker_progress(worker); + } + } + + UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); + + return UCS_OK; +} + +UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep) +{ + ucp_lane_index_t lane; + ucs_status_t status; + + UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); + + for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { + for (;;) { + status = uct_ep_flush(ep->uct_eps[lane], 0, NULL); + if (status == UCS_OK) { + break; + } else if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { + goto out; + } + ucp_worker_progress(ep->worker); + } + } + + status = UCS_OK; +out: + UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); + return status; +} From f38e106764664b4176c878307a89cc5fd9b0ce64 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 10 Oct 2017 20:31:52 +0300 Subject: [PATCH 2/2] UCP/API/TEST: Add non-blocking endpoint flush and use it for RMA tests The blocking version of endpoint flush may potentially cause a deadlock because it does not progress communications on anything except the current worker. Introduce a non-blocking flush and use it for unit tests. --- src/ucp/api/ucp.h | 27 ++++++--- src/ucp/api/ucp_compat.h | 7 +++ src/ucp/core/ucp_ep.c | 41 ++++++++++++- src/ucp/core/ucp_ep.h | 7 ++- src/ucp/core/ucp_request.h | 8 +-- src/ucp/core/ucp_types.h | 6 ++ src/ucp/rma/flush.c | 96 ++++++++++++++---------------- test/gtest/ucp/test_ucp_memheap.cc | 42 ++++++++----- test/gtest/ucp/test_ucp_memheap.h | 7 +++ test/gtest/ucp/test_ucp_wireup.cc | 40 +++++++------ test/gtest/ucp/ucp_test.cc | 17 ++++-- test/gtest/ucp/ucp_test.h | 8 ++- 12 files changed, 198 insertions(+), 108 deletions(-) diff --git a/src/ucp/api/ucp.h b/src/ucp/api/ucp.h index c188f801e78..df1a757f6f4 100644 --- a/src/ucp/api/ucp.h +++ b/src/ucp/api/ucp.h @@ -1588,8 +1588,8 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream); /** * @ingroup UCP_ENDPOINT * - * @brief Flush outstanding AMO and RMA operations on the @ref ucp_ep_h - * "endpoint". + * @brief Non-blocking flush of outstanding AMO and RMA operations on the + * @ref ucp_ep_h "endpoint". * * This routine flushes all outstanding AMO and RMA communications on the * @ref ucp_ep_h "endpoint". All the AMO and RMA operations issued on the @@ -1597,10 +1597,21 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream); * @ref ucp_ep_h "endpoint" when this call returns. * * @param [in] ep UCP endpoint. - * - * @return Error code as defined by @ref ucs_status_t - */ -ucs_status_t ucp_ep_flush(ucp_ep_h ep); + * @param [in] flags Flags for flush operation. Reserved for future use. + * @param [in] cb Callback which will be called when the flush operation + * completes. + * + * @return UCS_OK - The flush operation was completed immediately. + * @return UCS_PTR_IS_ERR(_ptr) - The flush operation failed. + * @return otherwise - Flush operation was scheduled and can be completed + * in any point in time. The request handle is returned + * to the application in order to track progress. The + * application is responsible to release the handle + * using @ref ucp_request_free "ucp_request_free()" + * routine. + */ +ucs_status_ptr_t ucp_ep_flush_nb(ucp_ep_h ep, unsigned flags, + ucp_send_callback_t cb); /** @@ -2645,8 +2656,8 @@ ucs_status_t ucp_atomic_cswap64(ucp_ep_h ep, uint64_t compare, uint64_t swap, * memory address @a remote_addr and the @ref ucp_rkey_h "remote memory handle" * @a rkey. * Return from the function does not guarantee completion. A user must - * call @ref ucp_ep_flush or @ref ucp_worker_flush to guarentee that the remote - * value has been updated. + * call @ref ucp_ep_flush_nb or @ref ucp_worker_flush to guarantee that the + * remote value has been updated. * * @param [in] ep UCP endpoint. * @param [in] opcode One of @ref ucp_atomic_post_op_t. diff --git a/src/ucp/api/ucp_compat.h b/src/ucp/api/ucp_compat.h index 6f939673978..70b50d54c7f 100644 --- a/src/ucp/api/ucp_compat.h +++ b/src/ucp/api/ucp_compat.h @@ -56,4 +56,11 @@ ucs_status_ptr_t ucp_disconnect_nb(ucp_ep_h ep); ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info); +/** + * @ingroup UCP_ENDPOINT + * @deprecated Replaced by @ref ucp_ep_flush_nb. + */ +ucs_status_t ucp_ep_flush(ucp_ep_h ep); + + #endif diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 32c13e71c3f..fd1c375cbb5 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -302,7 +302,6 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker, return status; } - void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); @@ -361,10 +360,12 @@ void ucp_ep_destroy_internal(ucp_ep_h ep, const char *message) ucs_free(ep); } -void ucp_ep_disconnected(ucp_ep_h ep) +static void ucp_ep_disconnected(ucp_ep_h ep) { ucp_recv_desc_t *rdesc; + ucs_trace("ep %p is disconnected", ep); + while (!ucs_queue_is_empty(&ep->stream_data)) { rdesc = ucs_queue_pull_elem_non_empty(&ep->stream_data, ucp_recv_desc_t, stream_queue); @@ -390,6 +391,32 @@ void ucp_ep_disconnected(ucp_ep_h ep) ucp_ep_destroy_internal(ep, " from disconnect"); } +static unsigned ucp_ep_do_disconnect(void *arg) +{ + ucp_request_t *req = arg; + + ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); + + ucp_ep_disconnected(req->send.ep); + + /* Complete send request from here, to avoid releasing the request while + * slow-path element is still pending */ + ucp_request_complete_send(req, req->status); + + return 0; +} + +static void ucp_ep_close_flushed_callback(ucp_request_t *req) +{ + ucp_ep_h ep = req->send.ep; + + ucs_trace("adding slow-path callback to destroy ep %p", ep); + req->send.disconnect.slow_cb_id = UCS_CALLBACKQ_ID_NULL; + uct_worker_progress_register_safe(ep->worker->uct, ucp_ep_do_disconnect, + req, UCS_CALLBACKQ_FLAG_ONESHOT, + &req->send.disconnect.slow_cb_id); +} + ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode) { ucp_worker_h worker = ep->worker; @@ -403,7 +430,15 @@ ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode) UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); UCS_ASYNC_BLOCK(&worker->async); - request = ucp_disconnect_nb_internal(ep, mode); + request = ucp_ep_flush_internal(ep, + (mode == UCP_EP_CLOSE_MODE_FLUSH) ? + UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL, + NULL, 0, + ucp_ep_close_flushed_callback); + if (!UCS_PTR_IS_PTR(request)) { + ucp_ep_disconnected(ep); + } + UCS_ASYNC_UNBLOCK(&worker->async); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 3e3b3b248eb..f18bc24b412 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -223,9 +223,10 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid, const char *peer_name, const char *message, ucp_ep_h *ep_p); -ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode); - -void ucp_ep_disconnected(ucp_ep_h ep); +ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags, + ucp_send_callback_t req_cb, + unsigned req_flags, + ucp_request_callback_t flushed_cb); ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid, const char *message, ucp_ep_h *ep_p); diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 52c92dbddc2..d8c23879483 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -68,10 +68,6 @@ enum { }; -/* Callback for UCP requests */ -typedef void (*ucp_request_callback_t)(ucp_request_t *req); - - /** * Request in progress. */ @@ -125,6 +121,10 @@ struct ucp_request { @ref uct_ep_flush */ } flush; + struct { + uct_worker_cb_id_t slow_cb_id;/* Slow-path callback */ + } disconnect; + struct { uint64_t remote_addr; /* Remote address */ ucp_atomic_fetch_op_t op; /* Requested AMO */ diff --git a/src/ucp/core/ucp_types.h b/src/ucp/core/ucp_types.h index ff96f889525..fd66fd96dd4 100644 --- a/src/ucp/core/ucp_types.h +++ b/src/ucp/core/ucp_types.h @@ -94,4 +94,10 @@ typedef void (*ucp_am_tracer_t)(ucp_worker_h worker, uct_am_trace_type_t type, char *buffer, size_t max); +/** + * Internal callback for UCP requests + */ +typedef void (*ucp_request_callback_t)(ucp_request_t *req); + + #endif diff --git a/src/ucp/rma/flush.c b/src/ucp/rma/flush.c index 2ee7dff20a1..cfb4ba1a5cc 100644 --- a/src/ucp/rma/flush.c +++ b/src/ucp/rma/flush.c @@ -86,39 +86,15 @@ static void ucp_ep_flush_slow_path_remove(ucp_request_t *req) &req->send.flush.slow_cb_id); } -static unsigned ucp_ep_flushed_slow_path_callback(void *arg) -{ - ucp_request_t *req = arg; - ucp_ep_h ep = req->send.ep; - - ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); - - ucs_trace("flush req %p ep %p remove from uct_worker %p", req, ep, - ep->worker->uct); - ucp_ep_flush_slow_path_remove(req); - req->send.flush.flushed_cb(req); - - /* Complete send request from here, to avoid releasing the request while - * slow-path element is still pending */ - ucp_request_complete_send(req, req->status); - - return 0; -} - static int ucp_flush_check_completion(ucp_request_t *req) { - ucp_ep_h ep = req->send.ep; - /* Check if flushed all lanes */ if (req->send.uct_comp.count != 0) { return 0; } - ucs_trace("adding slow-path callback to destroy ep %p", ep); ucp_ep_flush_slow_path_remove(req); - uct_worker_progress_register_safe(ep->worker->uct, - ucp_ep_flushed_slow_path_callback, req, 0, - &req->send.flush.slow_cb_id); + req->send.flush.flushed_cb(req); return 1; } @@ -191,12 +167,10 @@ static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status) ucp_flush_check_completion(req); } -static void ucp_ep_flushed_callback(ucp_request_t *req) -{ - ucp_ep_disconnected(req->send.ep); -} - -ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) +ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags, + ucp_send_callback_t req_cb, + unsigned req_flags, + ucp_request_callback_t flushed_cb) { ucs_status_t status; ucp_request_t *req; @@ -204,7 +178,6 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) ucs_debug("disconnect ep %p", ep); if (ep->flags & UCP_EP_FLAG_FAILED) { - ucp_ep_disconnected(ep); return NULL; } @@ -224,14 +197,14 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) * schedule slow-path callback to release the endpoint later, since a UCT * endpoint cannot be released from pending/completion callback context. */ - req->flags = 0; + req->flags = req_flags; req->status = UCS_OK; req->send.ep = ep; - req->send.flush.flushed_cb = ucp_ep_flushed_callback; + req->send.cb = req_cb; + req->send.flush.flushed_cb = flushed_cb; req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep)); req->send.flush.slow_cb_id = UCS_CALLBACKQ_ID_NULL; - req->send.flush.uct_flags = (mode == UCP_EP_CLOSE_MODE_FLUSH) ? - UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL; + req->send.flush.uct_flags = uct_flags; req->send.lane = UCP_NULL_LANE; req->send.uct.func = ucp_ep_flush_progress_pending; @@ -245,7 +218,6 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) ucs_trace_req("ep %p: releasing flush request %p, returning status %s", ep, req, ucs_status_string(status)); ucs_mpool_put(req); - ucp_ep_disconnected(ep); return UCS_STATUS_PTR(status); } @@ -254,6 +226,27 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) return req + 1; } +static void ucp_ep_flushed_callback(ucp_request_t *req) +{ + ucp_request_complete_send(req, req->status); +} + +ucs_status_ptr_t ucp_ep_flush_nb(ucp_ep_h ep, unsigned flags, + ucp_send_callback_t cb) +{ + void *request; + + UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); + + request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, + cb, UCP_REQUEST_FLAG_CALLBACK, + ucp_ep_flushed_callback); + + UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); + + return request; +} + UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) { unsigned rsc_index; @@ -282,25 +275,28 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep) { - ucp_lane_index_t lane; - ucs_status_t status; + ucs_status_t status; + void *request; UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); - for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { - for (;;) { - status = uct_ep_flush(ep->uct_eps[lane], 0, NULL); - if (status == UCS_OK) { - break; - } else if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { - goto out; - } + request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL, 0, + ucp_ep_flushed_callback); + if (request == NULL) { + status = UCS_OK; + } else if (UCS_PTR_IS_ERR(request)) { + ucs_warn("disconnect failed: %s", + ucs_status_string(UCS_PTR_STATUS(request))); + status = UCS_PTR_STATUS(request); + } else { + do { ucp_worker_progress(ep->worker); - } + status = ucp_request_check_status(request); + } while (status == UCS_INPROGRESS); + ucp_request_release(request); + status = UCS_OK; } - status = UCS_OK; -out: UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); return status; } diff --git a/test/gtest/ucp/test_ucp_memheap.cc b/test/gtest/ucp/test_ucp_memheap.cc index 37b4f3711f6..a6233b5bc9a 100644 --- a/test/gtest/ucp/test_ucp_memheap.cc +++ b/test/gtest/ucp/test_ucp_memheap.cc @@ -47,10 +47,7 @@ void test_ucp_memheap::test_nonblocking_implicit_stream_xfer(nonblocking_send_fu } memheap_size = max_iter * size + alignment; - sender().connect(&receiver(), get_ep_params()); - if (&sender() != &receiver()) { - receiver().connect(&sender(), get_ep_params()); - } + connect(is_ep_flush); params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH | @@ -111,7 +108,7 @@ void test_ucp_memheap::test_nonblocking_implicit_stream_xfer(nonblocking_send_fu } if (is_ep_flush) { - sender().flush_ep(); + flush_ep(sender()); } else { sender().flush_worker(); } @@ -123,10 +120,8 @@ void test_ucp_memheap::test_nonblocking_implicit_stream_xfer(nonblocking_send_fu } ucp_rkey_destroy(rkey); - receiver().flush_worker(); disconnect(sender()); - disconnect(receiver()); ucp_rkey_buffer_release(rkey_buffer); status = ucp_mem_unmap(receiver().ucph(), memh); @@ -159,10 +154,7 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, zero_offset = 1; } - sender().connect(&receiver(), get_ep_params()); - if (&sender() != &receiver()) { - receiver().connect(&sender(), get_ep_params()); - } + connect(is_ep_flush); ucp_mem_h memh; void *memheap = NULL; @@ -233,7 +225,7 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, rkey, expected_data); if (is_ep_flush) { - sender().flush_ep(); + flush_ep(sender()); } else { sender().flush_worker(); } @@ -245,15 +237,33 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, } ucp_rkey_destroy(rkey); - receiver().flush_worker(); + + disconnect(sender()); status = ucp_mem_unmap(receiver().ucph(), memh); ASSERT_UCS_OK(status); - disconnect(sender()); - disconnect(receiver()); - if (malloc_allocate) { free(memheap); } } + +void test_ucp_memheap::connect(bool is_ep_flush) +{ + sender().connect(&receiver(), get_ep_params()); + if (!is_ep_flush) { + /* If we're going to flush the worker during the test, need to flush + * the endpoint now to make sure the connection establishment is + * complete. This is because worker flush is a blocking call which may + * hang while waiting for the connection establishment, while endpoint + * flush is non-blocking so does not have this problem. + */ + flush_ep(sender()); + } +} + +void test_ucp_memheap::flush_ep(const entity &e) +{ + void *request = e.flush_ep_nb(); + wait(request); +} diff --git a/test/gtest/ucp/test_ucp_memheap.h b/test/gtest/ucp/test_ucp_memheap.h index 87fb2e26b19..e940e34d8c4 100644 --- a/test/gtest/ucp/test_ucp_memheap.h +++ b/test/gtest/ucp/test_ucp_memheap.h @@ -50,6 +50,7 @@ class test_ucp_memheap : public ucp_test { protected: const static size_t DEFAULT_SIZE = 0; const static int DEFAULT_ITERS = 0; + void test_blocking_xfer(blocking_send_func_t send, size_t len, int max_iters, size_t alignment, bool malloc_allocate, bool is_ep_flush); @@ -57,6 +58,12 @@ class test_ucp_memheap : public ucp_test { size_t len, int max_iters, size_t alignment, bool malloc_allocate, bool is_ep_flush); + +private: + void connect(bool is_ep_flush); + + void flush_ep(const entity &e); + }; diff --git a/test/gtest/ucp/test_ucp_wireup.cc b/test/gtest/ucp/test_ucp_wireup.cc index 72f046ce2c6..116dc1cea72 100644 --- a/test/gtest/ucp/test_ucp_wireup.cc +++ b/test/gtest/ucp/test_ucp_wireup.cc @@ -49,9 +49,11 @@ class test_ucp_wireup : public ucp_test { void send_recv(ucp_ep_h ep, ucp_worker_h worker, size_t vecsize, int repeat); + void waitall(std::vector reqs); + void disconnect(ucp_ep_h ep); - void waitall(std::vector reqs); + void disconnect(ucp_test::entity &e); private: vec_type m_send_data; @@ -233,6 +235,10 @@ void test_ucp_wireup::disconnect(ucp_ep_h ep) { wait(req); } +void test_ucp_wireup::disconnect(ucp_test::entity &e) { + disconnect(e.revoke_ep()); +} + void test_ucp_wireup::waitall(std::vector reqs) { while (!reqs.empty()) { @@ -389,9 +395,9 @@ UCS_TEST_P(test_ucp_wireup, stress_connect) { receiver().connect(&sender(), get_ep_params()); } - disconnect(sender().revoke_ep()); + disconnect(sender()); if (!is_loopback()) { - disconnect(receiver().revoke_ep()); + disconnect(receiver()); } } } @@ -404,9 +410,9 @@ UCS_TEST_P(test_ucp_wireup, stress_connect2) { receiver().connect(&sender(), get_ep_params()); } - disconnect(sender().revoke_ep()); + disconnect(sender()); if (!is_loopback()) { - disconnect(receiver().revoke_ep()); + disconnect(receiver()); } } } @@ -416,16 +422,16 @@ UCS_TEST_P(test_ucp_wireup, connect_disconnect) { if (!is_loopback()) { receiver().connect(&sender(), get_ep_params()); } - test_ucp_wireup::disconnect(sender().revoke_ep()); + disconnect(sender()); if (!is_loopback()) { - receiver().disconnect(); + disconnect(receiver()); } } UCS_TEST_P(test_ucp_wireup, disconnect_nonexistent) { skip_loopback(); sender().connect(&receiver(), get_ep_params()); - sender().disconnect(); + disconnect(sender()); receiver().destroy_worker(); sender().destroy_worker(); } @@ -433,26 +439,26 @@ UCS_TEST_P(test_ucp_wireup, disconnect_nonexistent) { UCS_TEST_P(test_ucp_wireup, disconnect_reconnect) { sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 1); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 1); sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 1); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 1); } UCS_TEST_P(test_ucp_wireup, send_disconnect_onesided) { sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 100); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 100); } UCS_TEST_P(test_ucp_wireup, send_disconnect_onesided_nozcopy, "ZCOPY_THRESH=-1") { sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 100); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 100); } @@ -464,12 +470,12 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_reply1) { send_b(sender().ep(), 8, 1); if (!is_loopback()) { - sender().disconnect(); + disconnect(sender()); } recv_b(receiver().worker(), 8, 1); send_b(receiver().ep(), 8, 1); - receiver().disconnect(); + disconnect(receiver()); recv_b(sender().worker(), 8, 1); } @@ -478,7 +484,7 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_reply2) { send_b(sender().ep(), 8, 1); if (!is_loopback()) { - sender().disconnect(); + disconnect(sender()); } recv_b(receiver().worker(), 8, 1); @@ -487,7 +493,7 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_reply2) { } send_b(receiver().ep(), 8, 1); - receiver().disconnect(); + disconnect(receiver()); recv_b(sender().worker(), 8, 1); } @@ -495,7 +501,7 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_onesided_wait) { sender().connect(&receiver(), get_ep_params()); send_recv(sender().ep(), receiver().worker(), 8, 1); send_b(sender().ep(), 1000, 200); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 200); } diff --git a/test/gtest/ucp/ucp_test.cc b/test/gtest/ucp/ucp_test.cc index 2fff41ef333..ed95a9503e4 100644 --- a/test/gtest/ucp/ucp_test.cc +++ b/test/gtest/ucp/ucp_test.cc @@ -368,16 +368,23 @@ void ucp_test_base::entity::flush_ep(int worker_index, int ep_index) const { ASSERT_UCS_OK(status); } +void ucp_test_base::entity::empty_send_completion(void *r, ucs_status_t status) { +} + +void* ucp_test_base::entity::flush_ep_nb(int worker_index, int ep_index) const { + void *request = ucp_ep_flush_nb(ep(worker_index, ep_index), 0, + empty_send_completion); + if (!UCS_PTR_IS_PTR(request)) { + ASSERT_UCS_OK(UCS_PTR_STATUS(request)); + } + return request; +} + void ucp_test_base::entity::fence(int worker_index) const { ucs_status_t status = ucp_worker_fence(worker(worker_index)); ASSERT_UCS_OK(status); } -void ucp_test_base::entity::disconnect(int worker_index, int ep_index) { - flush_ep(worker_index, ep_index); - m_workers[worker_index].second[ep_index].reset(); -} - void* ucp_test_base::entity::disconnect_nb(int worker_index, int ep_index) const { ucp_ep_h ep = revoke_ep(worker_index, ep_index); if (ep == NULL) { diff --git a/test/gtest/ucp/ucp_test.h b/test/gtest/ucp/ucp_test.h index c7cc016b1ac..17976771aea 100644 --- a/test/gtest/ucp/ucp_test.h +++ b/test/gtest/ucp/ucp_test.h @@ -51,12 +51,12 @@ class ucp_test_base : public ucs::test_base { void flush_ep(int worker_index = 0, int ep_index = 0) const; + void *flush_ep_nb(int worker_index = 0, int ep_index = 0) const; + void flush_worker(int worker_index = 0) const; void fence(int worker_index = 0) const; - void disconnect(int worker_index = 0, int ep_index = 0); - void* disconnect_nb(int worker_index = 0, int ep_index = 0) const; void destroy_worker(int worker_index = 0); @@ -76,11 +76,15 @@ class ucp_test_base : public ucs::test_base { void cleanup(); static void ep_destructor(ucp_ep_h ep, entity *e); + protected: ucs::handle m_ucph; worker_vec_t m_workers; int num_workers; + + private: + static void empty_send_completion(void *r, ucs_status_t status); }; };