diff --git a/src/ucp/api/ucp.h b/src/ucp/api/ucp.h index c188f801e78f..575fca887c7c 100644 --- a/src/ucp/api/ucp.h +++ b/src/ucp/api/ucp.h @@ -1588,8 +1588,8 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream); /** * @ingroup UCP_ENDPOINT * - * @brief Flush outstanding AMO and RMA operations on the @ref ucp_ep_h - * "endpoint". + * @brief Non-blocking flush of outstanding AMO and RMA operations on the + * @ref ucp_ep_h "endpoint". * * This routine flushes all outstanding AMO and RMA communications on the * @ref ucp_ep_h "endpoint". All the AMO and RMA operations issued on the @@ -1597,10 +1597,21 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream); * @ref ucp_ep_h "endpoint" when this call returns. * * @param [in] ep UCP endpoint. - * - * @return Error code as defined by @ref ucs_status_t - */ -ucs_status_t ucp_ep_flush(ucp_ep_h ep); + * @param [in] flags Flags for flush operation. Reserved for future use. + * @param [in] cb Callback which will be called when the flush operation + * completes. + * + * @return UCS_OK - The flush operation was completed immediately. + * @return UCS_PTR_IS_ERR(_ptr) - The flush operation failed. + * @return otherwise - Flush operation was scheduled and can be completed + * in any point in time. The request handle is returned + * to the application in order to track progress. The + * application is responsible to release the handle + * using @ref ucp_request_free "ucp_request_free()" + * routine. + */ +ucs_status_ptr_t ucp_ep_flush_nb(ucp_ep_h ep, unsigned flags, + ucp_send_callback_t cb); /** @@ -2315,7 +2326,7 @@ ucs_status_t ucp_put(ucp_ep_h ep, const void *buffer, size_t length, * completed immediately the routine return UCS_OK, otherwise UCS_INPROGRESS * or an error is returned to user. * - * @note A user can use @ref ucp_worker_flush "ucp_worker_flush()" + * @note A user can use @ref ucp_worker_flush_nb "ucp_worker_flush_nb()" * in order to guarantee re-usability of the source address @e buffer. * * @param [in] ep Remote endpoint handle. @@ -2369,7 +2380,7 @@ ucs_status_t ucp_get(ucp_ep_h ep, void *buffer, size_t length, * address. The routine returns immediately and @b does @b not guarantee that * remote data is loaded and stored under the local address @e buffer. * - * @note A user can use @ref ucp_worker_flush "ucp_worker_flush()" in order + * @note A user can use @ref ucp_worker_flush_nb "ucp_worker_flush_nb()" in order * guarantee that remote data is loaded and stored under the local address * @e buffer. * @@ -2645,8 +2656,8 @@ ucs_status_t ucp_atomic_cswap64(ucp_ep_h ep, uint64_t compare, uint64_t swap, * memory address @a remote_addr and the @ref ucp_rkey_h "remote memory handle" * @a rkey. * Return from the function does not guarantee completion. A user must - * call @ref ucp_ep_flush or @ref ucp_worker_flush to guarentee that the remote - * value has been updated. + * call @ref ucp_ep_flush_nb or @ref ucp_worker_flush_nb to guarantee that the + * remote value has been updated. * * @param [in] ep UCP endpoint. * @param [in] opcode One of @ref ucp_atomic_post_op_t. @@ -2855,10 +2866,10 @@ void ucp_dt_destroy(ucp_datatype_t datatype); * which follow the call to @ref ucp_worker_fence "fence". * * @note The primary difference between @ref ucp_worker_fence "ucp_worker_fence()" - * and the @ref ucp_worker_flush "ucp_worker_flush()" is the fact the fence + * and the @ref ucp_worker_flush_nb "ucp_worker_flush_nb()" is the fact the fence * routine does not guarantee completion of the operations on the call return but * only ensures the order between communication operations. The - * @ref ucp_worker_flush "flush" operation on return guarantees that all + * @ref ucp_worker_flush_nb "flush" operation on return guarantees that all * operations are completed and corresponding memory regions were updated. * * @param [in] worker UCP worker. @@ -2879,15 +2890,26 @@ ucs_status_t ucp_worker_fence(ucp_worker_h worker); * @a worker prior to this call are completed both at the origin and at the * target when this call returns. * - * @note For description of the differences between @ref ucp_worker_flush + * @note For description of the differences between @ref ucp_worker_flush_nb * "flush" and @ref ucp_worker_fence "fence" operations please see * @ref ucp_worker_fence "ucp_worker_fence()" * - * @param [in] worker UCP worker. - * - * @return Error code as defined by @ref ucs_status_t - */ -ucs_status_t ucp_worker_flush(ucp_worker_h worker); + * @param [in] worker UCP worker. + * @param [in] flags Flags for flush operation. Reserved for future use. + * @param [in] cb Callback which will be called when the flush operation + * completes. + * + * @return UCS_OK - The flush operation was completed immediately. + * @return UCS_PTR_IS_ERR(_ptr) - The flush operation failed. + * @return otherwise - Flush operation was scheduled and can be completed + * in any point in time. The request handle is returned + * to the application in order to track progress. The + * application is responsible to release the handle + * using @ref ucp_request_free "ucp_request_free()" + * routine. + */ +ucs_status_ptr_t ucp_worker_flush_nb(ucp_worker_h worker, unsigned flags, + ucp_send_callback_t cb); /** diff --git a/src/ucp/api/ucp_compat.h b/src/ucp/api/ucp_compat.h index 6f939673978a..e93830e47f10 100644 --- a/src/ucp/api/ucp_compat.h +++ b/src/ucp/api/ucp_compat.h @@ -56,4 +56,31 @@ ucs_status_ptr_t ucp_disconnect_nb(ucp_ep_h ep); ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info); +/** + * @ingroup UCP_ENDPOINT + * @deprecated Replaced by @ref ucp_ep_flush_nb. + */ +ucs_status_t ucp_ep_flush(ucp_ep_h ep); + +/** + * @ingroup UCP_WORKER + * + * @brief Flush outstanding AMO and RMA operations on the @ref ucp_worker_h + * "worker" + * + * This routine flushes all outstanding AMO and RMA communications on the + * @ref ucp_worker_h "worker". All the AMO and RMA operations issued on the + * @a worker prior to this call are completed both at the origin and at the + * target when this call returns. + * + * @note For description of the differences between @ref ucp_worker_flush + * "flush" and @ref ucp_worker_fence "fence" operations please see + * @ref ucp_worker_fence "ucp_worker_fence()" + * + * @param [in] worker UCP worker. + * + * @return Error code as defined by @ref ucs_status_t + */ +ucs_status_t ucp_worker_flush(ucp_worker_h worker); + #endif diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 32c13e71c3ff..20abe2b09940 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -302,7 +302,6 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker, return status; } - void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); @@ -361,10 +360,12 @@ void ucp_ep_destroy_internal(ucp_ep_h ep, const char *message) ucs_free(ep); } -void ucp_ep_disconnected(ucp_ep_h ep) +static void ucp_ep_disconnected(ucp_ep_h ep) { ucp_recv_desc_t *rdesc; + ucs_trace("ep %p is disconnected", ep); + while (!ucs_queue_is_empty(&ep->stream_data)) { rdesc = ucs_queue_pull_elem_non_empty(&ep->stream_data, ucp_recv_desc_t, stream_queue); @@ -390,6 +391,36 @@ void ucp_ep_disconnected(ucp_ep_h ep) ucp_ep_destroy_internal(ep, " from disconnect"); } +static unsigned ucp_ep_do_disconnect(void *arg) +{ + ucp_request_t *req = arg; + + ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); + + ucp_ep_disconnected(req->send.ep); + + /* Complete send request from here, to avoid releasing the request while + * slow-path element is still pending */ + ucp_request_complete_send(req, req->status); + + return 0; +} + +static void ucp_ep_close_flushed_callback(ucp_request_t *req) +{ + ucp_ep_h ep = req->send.ep; + + /* If a flush is completed from a pending/completion callback, we need to + * schedule slow-path callback to release the endpoint later, since a UCT + * endpoint cannot be released from pending/completion callback context. + */ + ucs_trace("adding slow-path callback to destroy ep %p", ep); + req->send.disconnect.prog_id = UCS_CALLBACKQ_ID_NULL; + uct_worker_progress_register_safe(ep->worker->uct, ucp_ep_do_disconnect, + req, UCS_CALLBACKQ_FLAG_ONESHOT, + &req->send.disconnect.prog_id); +} + ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode) { ucp_worker_h worker = ep->worker; @@ -403,7 +434,15 @@ ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode) UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); UCS_ASYNC_BLOCK(&worker->async); - request = ucp_disconnect_nb_internal(ep, mode); + request = ucp_ep_flush_internal(ep, + (mode == UCP_EP_CLOSE_MODE_FLUSH) ? + UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL, + NULL, 0, + ucp_ep_close_flushed_callback); + if (!UCS_PTR_IS_PTR(request)) { + ucp_ep_disconnected(ep); + } + UCS_ASYNC_UNBLOCK(&worker->async); UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 3e3b3b248ebe..f18bc24b4121 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -223,9 +223,10 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid, const char *peer_name, const char *message, ucp_ep_h *ep_p); -ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode); - -void ucp_ep_disconnected(ucp_ep_h ep); +ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags, + ucp_send_callback_t req_cb, + unsigned req_flags, + ucp_request_callback_t flushed_cb); ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid, const char *message, ucp_ep_h *ep_p); diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 52c92dbddc29..7d79e903e883 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -68,10 +68,6 @@ enum { }; -/* Callback for UCP requests */ -typedef void (*ucp_request_callback_t)(ucp_request_t *req); - - /** * Request in progress. */ @@ -118,13 +114,17 @@ struct ucp_request { } rndv_get; struct { - ucp_request_callback_t flushed_cb;/* Called when flushed */ - uct_worker_cb_id_t slow_cb_id;/* Slow-path callback */ - ucp_lane_map_t lanes; /* Which lanes need to be flushed */ - unsigned uct_flags; /* Flags to pass to + ucp_request_callback_t flushed_cb;/* Called when flushed */ + uct_worker_cb_id_t prog_id; /* Progress callback ID */ + ucp_lane_map_t lanes; /* Which lanes need to be flushed */ + unsigned uct_flags; /* Flags to pass to @ref uct_ep_flush */ } flush; + struct { + uct_worker_cb_id_t prog_id;/* Slow-path callback */ + } disconnect; + struct { uint64_t remote_addr; /* Remote address */ ucp_atomic_fetch_op_t op; /* Requested AMO */ @@ -167,6 +167,12 @@ struct ucp_request { ucp_mem_desc_t *rdesc; uct_tag_context_t uct_ctx; /* Transport offload context */ } recv; + + struct { + ucp_worker_h worker; /* Worker to flush */ + ucp_send_callback_t cb; /* Completion callback */ + uct_worker_cb_id_t prog_id; /* Progress callback ID */ + } flush_worker; }; }; diff --git a/src/ucp/core/ucp_types.h b/src/ucp/core/ucp_types.h index ff96f889525c..fd66fd96dd46 100644 --- a/src/ucp/core/ucp_types.h +++ b/src/ucp/core/ucp_types.h @@ -94,4 +94,10 @@ typedef void (*ucp_am_tracer_t)(ucp_worker_h worker, uct_am_trace_type_t type, char *buffer, size_t max); +/** + * Internal callback for UCP requests + */ +typedef void (*ucp_request_callback_t)(ucp_request_t *req); + + #endif diff --git a/src/ucp/rma/flush.c b/src/ucp/rma/flush.c index 2ee7dff20a18..d926414c03c1 100644 --- a/src/ucp/rma/flush.c +++ b/src/ucp/rma/flush.c @@ -83,42 +83,18 @@ static void ucp_ep_flush_slow_path_remove(ucp_request_t *req) { ucp_ep_h ep = req->send.ep; uct_worker_progress_unregister_safe(ep->worker->uct, - &req->send.flush.slow_cb_id); -} - -static unsigned ucp_ep_flushed_slow_path_callback(void *arg) -{ - ucp_request_t *req = arg; - ucp_ep_h ep = req->send.ep; - - ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); - - ucs_trace("flush req %p ep %p remove from uct_worker %p", req, ep, - ep->worker->uct); - ucp_ep_flush_slow_path_remove(req); - req->send.flush.flushed_cb(req); - - /* Complete send request from here, to avoid releasing the request while - * slow-path element is still pending */ - ucp_request_complete_send(req, req->status); - - return 0; + &req->send.flush.prog_id); } static int ucp_flush_check_completion(ucp_request_t *req) { - ucp_ep_h ep = req->send.ep; - /* Check if flushed all lanes */ if (req->send.uct_comp.count != 0) { return 0; } - ucs_trace("adding slow-path callback to destroy ep %p", ep); ucp_ep_flush_slow_path_remove(req); - uct_worker_progress_register_safe(ep->worker->uct, - ucp_ep_flushed_slow_path_callback, req, 0, - &req->send.flush.slow_cb_id); + req->send.flush.flushed_cb(req); return 1; } @@ -161,7 +137,7 @@ static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self) ucs_trace("ep %p: adding slow-path callback to resume flush", ep); uct_worker_progress_register_safe(ep->worker->uct, ucp_ep_flush_resume_slow_path_callback, - req, 0, &req->send.flush.slow_cb_id); + req, 0, &req->send.flush.prog_id); } if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { @@ -191,12 +167,10 @@ static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status) ucp_flush_check_completion(req); } -static void ucp_ep_flushed_callback(ucp_request_t *req) -{ - ucp_ep_disconnected(req->send.ep); -} - -ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) +ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags, + ucp_send_callback_t req_cb, + unsigned req_flags, + ucp_request_callback_t flushed_cb) { ucs_status_t status; ucp_request_t *req; @@ -204,7 +178,6 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) ucs_debug("disconnect ep %p", ep); if (ep->flags & UCP_EP_FLAG_FAILED) { - ucp_ep_disconnected(ep); return NULL; } @@ -220,18 +193,15 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) * are not flushed yet, and when it reaches zero, it means all lanes are * flushed. req->send.flush.lanes keeps track of which lanes we still have * to start flush on. - * If a flush is completed from a pending/completion callback, we need to - * schedule slow-path callback to release the endpoint later, since a UCT - * endpoint cannot be released from pending/completion callback context. - */ - req->flags = 0; + */ + req->flags = req_flags; req->status = UCS_OK; req->send.ep = ep; - req->send.flush.flushed_cb = ucp_ep_flushed_callback; + req->send.cb = req_cb; + req->send.flush.flushed_cb = flushed_cb; req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep)); - req->send.flush.slow_cb_id = UCS_CALLBACKQ_ID_NULL; - req->send.flush.uct_flags = (mode == UCP_EP_CLOSE_MODE_FLUSH) ? - UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL; + req->send.flush.prog_id = UCS_CALLBACKQ_ID_NULL; + req->send.flush.uct_flags = uct_flags; req->send.lane = UCP_NULL_LANE; req->send.uct.func = ucp_ep_flush_progress_pending; @@ -245,7 +215,6 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) ucs_trace_req("ep %p: releasing flush request %p, returning status %s", ep, req, ucs_status_string(status)); ucs_mpool_put(req); - ucp_ep_disconnected(ep); return UCS_STATUS_PTR(status); } @@ -254,53 +223,154 @@ ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode) return req + 1; } -UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) +static void ucp_ep_flushed_callback(ucp_request_t *req) { - unsigned rsc_index; + ucp_request_complete_send(req, req->status); +} - UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); +UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_ep_flush_nb, (ep, flags, cb), + ucp_ep_h ep, unsigned flags, ucp_send_callback_t cb) +{ + void *request; + + UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); + + request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, + cb, UCP_REQUEST_FLAG_CALLBACK, + ucp_ep_flushed_callback); + + UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); + + return request; +} + +static ucs_status_t ucp_worker_flush_check(ucp_worker_h worker) +{ + ucs_status_t status; + unsigned rsc_index; - while (worker->wireup_pend_count > 0) { - ucp_worker_progress(worker); + if (worker->wireup_pend_count > 0) { + return UCS_INPROGRESS; } - /* TODO flush in parallel */ for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { if (worker->ifaces[rsc_index].iface == NULL) { continue; } - while (uct_iface_flush(worker->ifaces[rsc_index].iface, 0, NULL) != UCS_OK) { - ucp_worker_progress(worker); + status = uct_iface_flush(worker->ifaces[rsc_index].iface, 0, NULL); + if (status != UCS_OK) { + return status; } } + return UCS_OK; +} + +static unsigned ucp_worker_flush_progress(void *arg) +{ + ucp_request_t *req = arg; + ucp_worker_h worker = req->flush_worker.worker; + ucs_status_t status; + + status = ucp_worker_flush_check(worker); + if (status == UCS_INPROGRESS) { + return 0; + } + + uct_worker_progress_unregister_safe(worker->uct, &req->flush_worker.prog_id); + ucp_request_complete(req, flush_worker.cb, status); + return 0; +} + +static ucs_status_ptr_t ucp_worker_flush_nb_internal(ucp_worker_h worker, + ucp_send_callback_t cb, + unsigned req_flags) +{ + ucs_status_t status; + ucp_request_t *req; + + status = ucp_worker_flush_check(worker); + if (status != UCS_INPROGRESS) { + return UCS_STATUS_PTR(status); + } + + req = ucs_mpool_get(&worker->req_mp); + if (req == NULL) { + return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); + } + + req->flags = req_flags; + req->status = UCS_OK; + req->flush_worker.worker = worker; + req->flush_worker.cb = cb; + req->flush_worker.prog_id = UCS_CALLBACKQ_ID_NULL; + + uct_worker_progress_register_safe(worker->uct, ucp_worker_flush_progress, + req, 0, &req->flush_worker.prog_id); + return req + 1; +} + +UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_worker_flush_nb, (worker, flags, cb), + ucp_worker_h worker, unsigned flags, ucp_send_callback_t cb) +{ + void *request; + + UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); + + request = ucp_worker_flush_nb_internal(worker, cb, + UCP_REQUEST_FLAG_CALLBACK); + UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); - return UCS_OK; + return request; +} + +static ucs_status_t ucp_flush_wait(ucp_worker_h worker, void *request) +{ + ucs_status_t status; + + if (request == NULL) { + return UCS_OK; + } else if (UCS_PTR_IS_ERR(request)) { + ucs_warn("flush failed: %s", ucs_status_string(UCS_PTR_STATUS(request))); + return UCS_PTR_STATUS(request); + } else { + do { + ucp_worker_progress(worker); + status = ucp_request_check_status(request); + } while (status == UCS_INPROGRESS); + ucp_request_release(request); + return UCS_OK; + } +} + +UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) +{ + ucs_status_t status; + void *request; + + UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock); + + request = ucp_worker_flush_nb_internal(worker, NULL, 0); + status = ucp_flush_wait(worker, request); + + UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock); + + return status; } UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep) { - ucp_lane_index_t lane; - ucs_status_t status; + ucs_status_t status; + void *request; UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); - for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { - for (;;) { - status = uct_ep_flush(ep->uct_eps[lane], 0, NULL); - if (status == UCS_OK) { - break; - } else if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { - goto out; - } - ucp_worker_progress(ep->worker); - } - } + request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL, 0, + ucp_ep_flushed_callback); + status = ucp_flush_wait(ep->worker, request); - status = UCS_OK; -out: UCP_THREAD_CS_EXIT_CONDITIONAL(&ep->worker->mt_lock); return status; } diff --git a/test/gtest/ucp/test_ucp_atomic.cc b/test/gtest/ucp/test_ucp_atomic.cc index 0a1b1e7b76b7..9d54c90a7467 100644 --- a/test/gtest/ucp/test_ucp_atomic.cc +++ b/test/gtest/ucp/test_ucp_atomic.cc @@ -185,7 +185,7 @@ void test_ucp_atomic::nb_add(entity *e, size_t max_size, void *memheap_addr, memheap_addr, rkey); if (status == UCS_INPROGRESS) { - e->flush_worker(); + flush_worker(*e); } else { ASSERT_UCS_OK(status); } diff --git a/test/gtest/ucp/test_ucp_fence.cc b/test/gtest/ucp/test_ucp_fence.cc index ca0f426983cf..1a6793bede95 100644 --- a/test/gtest/ucp/test_ucp_fence.cc +++ b/test/gtest/ucp/test_ucp_fence.cc @@ -98,7 +98,7 @@ class test_ucp_fence : public test_ucp_atomic { (test->*m_send_2)(m_entity, &zero, &result, m_memheap, m_rkey); - m_entity->flush_worker(); + test->flush_worker(*m_entity); if (result != (uint64_t)(i+1)) (*error)++; diff --git a/test/gtest/ucp/test_ucp_memheap.cc b/test/gtest/ucp/test_ucp_memheap.cc index 37b4f3711f61..76a1e19a9a7b 100644 --- a/test/gtest/ucp/test_ucp_memheap.cc +++ b/test/gtest/ucp/test_ucp_memheap.cc @@ -48,9 +48,6 @@ void test_ucp_memheap::test_nonblocking_implicit_stream_xfer(nonblocking_send_fu memheap_size = max_iter * size + alignment; sender().connect(&receiver(), get_ep_params()); - if (&sender() != &receiver()) { - receiver().connect(&sender(), get_ep_params()); - } params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH | @@ -111,9 +108,9 @@ void test_ucp_memheap::test_nonblocking_implicit_stream_xfer(nonblocking_send_fu } if (is_ep_flush) { - sender().flush_ep(); + flush_ep(sender()); } else { - sender().flush_worker(); + flush_worker(sender()); } for (int i = 0; i < max_iter; ++i) { @@ -123,10 +120,8 @@ void test_ucp_memheap::test_nonblocking_implicit_stream_xfer(nonblocking_send_fu } ucp_rkey_destroy(rkey); - receiver().flush_worker(); disconnect(sender()); - disconnect(receiver()); ucp_rkey_buffer_release(rkey_buffer); status = ucp_mem_unmap(receiver().ucph(), memh); @@ -160,9 +155,6 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, } sender().connect(&receiver(), get_ep_params()); - if (&sender() != &receiver()) { - receiver().connect(&sender(), get_ep_params()); - } ucp_mem_h memh; void *memheap = NULL; @@ -233,9 +225,9 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, rkey, expected_data); if (is_ep_flush) { - sender().flush_ep(); + flush_ep(sender()); } else { - sender().flush_worker(); + flush_worker(sender()); } EXPECT_EQ(expected_data, @@ -245,14 +237,12 @@ void test_ucp_memheap::test_blocking_xfer(blocking_send_func_t send, } ucp_rkey_destroy(rkey); - receiver().flush_worker(); + + disconnect(sender()); status = ucp_mem_unmap(receiver().ucph(), memh); ASSERT_UCS_OK(status); - disconnect(sender()); - disconnect(receiver()); - if (malloc_allocate) { free(memheap); } diff --git a/test/gtest/ucp/test_ucp_memheap.h b/test/gtest/ucp/test_ucp_memheap.h index 87fb2e26b19f..ec0f5b9eee96 100644 --- a/test/gtest/ucp/test_ucp_memheap.h +++ b/test/gtest/ucp/test_ucp_memheap.h @@ -50,6 +50,7 @@ class test_ucp_memheap : public ucp_test { protected: const static size_t DEFAULT_SIZE = 0; const static int DEFAULT_ITERS = 0; + void test_blocking_xfer(blocking_send_func_t send, size_t len, int max_iters, size_t alignment, bool malloc_allocate, bool is_ep_flush); diff --git a/test/gtest/ucp/test_ucp_peer_failure.cc b/test/gtest/ucp/test_ucp_peer_failure.cc index d5b50f6906a9..509778dd59eb 100644 --- a/test/gtest/ucp/test_ucp_peer_failure.cc +++ b/test/gtest/ucp/test_ucp_peer_failure.cc @@ -91,7 +91,7 @@ class test_ucp_peer_failure : * self, self->ep_id, self->conn_id, * (int)ucs_queue_length(&self->tx.window)); */ - receiver().flush_worker(); + flush_worker(receiver()); m_entities.remove(&receiver()); } diff --git a/test/gtest/ucp/test_ucp_rma_mt.cc b/test/gtest/ucp/test_ucp_rma_mt.cc index 07e0e2328da2..9f9b97c72ab8 100644 --- a/test/gtest/ucp/test_ucp_rma_mt.cc +++ b/test/gtest/ucp/test_ucp_rma_mt.cc @@ -100,7 +100,7 @@ UCS_TEST_P(test_ucp_rma_mt, put_get) { (uintptr_t)((uint64_t *)memheap + i), rkey[worker_index]); ASSERT_UCS_OK(status); - sender().flush_worker(worker_index); + flush_worker(sender(), worker_index); EXPECT_EQ(orig_data[i], target_data[i]); } @@ -126,7 +126,7 @@ UCS_TEST_P(test_ucp_rma_mt, put_get) { (uintptr_t)((uint64_t *)memheap + i), rkey[worker_index]); ASSERT_UCS_OK_OR_INPROGRESS(status); - sender().flush_worker(worker_index); + flush_worker(sender(), worker_index); EXPECT_EQ(orig_data[i], target_data[i]); } @@ -152,7 +152,7 @@ UCS_TEST_P(test_ucp_rma_mt, put_get) { (uintptr_t)((uint64_t *)memheap + i), rkey[worker_index]); ASSERT_UCS_OK(status); - sender().flush_worker(worker_index); + flush_worker(sender(), worker_index); EXPECT_EQ(orig_data[i], target_data[i]); } @@ -178,7 +178,7 @@ UCS_TEST_P(test_ucp_rma_mt, put_get) { (uintptr_t)((uint64_t *)memheap + i), rkey[worker_index]); ASSERT_UCS_OK_OR_INPROGRESS(status); - sender().flush_worker(worker_index); + flush_worker(sender(), worker_index); EXPECT_EQ(orig_data[i], target_data[i]); } diff --git a/test/gtest/ucp/test_ucp_stream.cc b/test/gtest/ucp/test_ucp_stream.cc index c75587d036b9..1f2e2521a91d 100644 --- a/test/gtest/ucp/test_ucp_stream.cc +++ b/test/gtest/ucp/test_ucp_stream.cc @@ -126,7 +126,7 @@ UCS_TEST_P(test_ucp_stream_many2one, drop_data) { } for (size_t i = 0; i < m_nsenders + 1; ++i) { - e(i).flush_worker(); + flush_worker(e(i)); } /* Need to poll out all incoming data from transport layer */ diff --git a/test/gtest/ucp/test_ucp_wireup.cc b/test/gtest/ucp/test_ucp_wireup.cc index 72f046ce2c60..9c63e9d76d9a 100644 --- a/test/gtest/ucp/test_ucp_wireup.cc +++ b/test/gtest/ucp/test_ucp_wireup.cc @@ -49,9 +49,11 @@ class test_ucp_wireup : public ucp_test { void send_recv(ucp_ep_h ep, ucp_worker_h worker, size_t vecsize, int repeat); + void waitall(std::vector reqs); + void disconnect(ucp_ep_h ep); - void waitall(std::vector reqs); + void disconnect(ucp_test::entity &e); private: vec_type m_send_data; @@ -233,6 +235,10 @@ void test_ucp_wireup::disconnect(ucp_ep_h ep) { wait(req); } +void test_ucp_wireup::disconnect(ucp_test::entity &e) { + disconnect(e.revoke_ep()); +} + void test_ucp_wireup::waitall(std::vector reqs) { while (!reqs.empty()) { @@ -312,7 +318,7 @@ UCS_TEST_P(test_ucp_wireup, empty_address) { UCS_TEST_P(test_ucp_wireup, one_sided_wireup) { sender().connect(&receiver(), get_ep_params()); send_recv(sender().ep(), receiver().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); } UCS_TEST_P(test_ucp_wireup, two_sided_wireup) { @@ -322,9 +328,9 @@ UCS_TEST_P(test_ucp_wireup, two_sided_wireup) { } send_recv(sender().ep(), receiver().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); send_recv(receiver().ep(), sender().worker(), 1, 1); - receiver().flush_worker(); + flush_worker(receiver()); } UCS_TEST_P(test_ucp_wireup, multi_wireup) { @@ -352,7 +358,7 @@ UCS_TEST_P(test_ucp_wireup, reply_ep_send_before) { ucp_ep_h ep = ucp_worker_get_reply_ep(receiver().worker(), sender().worker()->uuid); send_recv(ep, sender().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); disconnect(ep); } @@ -368,13 +374,13 @@ UCS_TEST_P(test_ucp_wireup, reply_ep_send_after) { /* Make sure the wireup message arrives before sending a reply */ send_recv(sender().ep(), receiver().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); /* Send a reply */ ucp_ep_h ep = ucp_worker_get_reply_ep(receiver().worker(), sender().worker()->uuid); send_recv(ep, sender().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); disconnect(ep); } @@ -389,9 +395,9 @@ UCS_TEST_P(test_ucp_wireup, stress_connect) { receiver().connect(&sender(), get_ep_params()); } - disconnect(sender().revoke_ep()); + disconnect(sender()); if (!is_loopback()) { - disconnect(receiver().revoke_ep()); + disconnect(receiver()); } } } @@ -404,9 +410,9 @@ UCS_TEST_P(test_ucp_wireup, stress_connect2) { receiver().connect(&sender(), get_ep_params()); } - disconnect(sender().revoke_ep()); + disconnect(sender()); if (!is_loopback()) { - disconnect(receiver().revoke_ep()); + disconnect(receiver()); } } } @@ -416,16 +422,16 @@ UCS_TEST_P(test_ucp_wireup, connect_disconnect) { if (!is_loopback()) { receiver().connect(&sender(), get_ep_params()); } - test_ucp_wireup::disconnect(sender().revoke_ep()); + disconnect(sender()); if (!is_loopback()) { - receiver().disconnect(); + disconnect(receiver()); } } UCS_TEST_P(test_ucp_wireup, disconnect_nonexistent) { skip_loopback(); sender().connect(&receiver(), get_ep_params()); - sender().disconnect(); + disconnect(sender()); receiver().destroy_worker(); sender().destroy_worker(); } @@ -433,26 +439,26 @@ UCS_TEST_P(test_ucp_wireup, disconnect_nonexistent) { UCS_TEST_P(test_ucp_wireup, disconnect_reconnect) { sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 1); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 1); sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 1); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 1); } UCS_TEST_P(test_ucp_wireup, send_disconnect_onesided) { sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 100); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 100); } UCS_TEST_P(test_ucp_wireup, send_disconnect_onesided_nozcopy, "ZCOPY_THRESH=-1") { sender().connect(&receiver(), get_ep_params()); send_b(sender().ep(), 1000, 100); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 100); } @@ -464,12 +470,12 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_reply1) { send_b(sender().ep(), 8, 1); if (!is_loopback()) { - sender().disconnect(); + disconnect(sender()); } recv_b(receiver().worker(), 8, 1); send_b(receiver().ep(), 8, 1); - receiver().disconnect(); + disconnect(receiver()); recv_b(sender().worker(), 8, 1); } @@ -478,7 +484,7 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_reply2) { send_b(sender().ep(), 8, 1); if (!is_loopback()) { - sender().disconnect(); + disconnect(sender()); } recv_b(receiver().worker(), 8, 1); @@ -487,7 +493,7 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_reply2) { } send_b(receiver().ep(), 8, 1); - receiver().disconnect(); + disconnect(receiver()); recv_b(sender().worker(), 8, 1); } @@ -495,7 +501,7 @@ UCS_TEST_P(test_ucp_wireup, send_disconnect_onesided_wait) { sender().connect(&receiver(), get_ep_params()); send_recv(sender().ep(), receiver().worker(), 8, 1); send_b(sender().ep(), 1000, 200); - sender().disconnect(); + disconnect(sender()); recv_b(receiver().worker(), 1000, 200); } @@ -544,19 +550,19 @@ UCS_TEST_P(test_ucp_wireup_errh_peer, msg_after_ep_create) { sender().connect(&receiver(), get_ep_params()); send_recv(sender().ep(), receiver().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); } UCS_TEST_P(test_ucp_wireup_errh_peer, msg_before_ep_create) { sender().connect(&receiver(), get_ep_params()); send_recv(sender().ep(), receiver().worker(), 1, 1); - sender().flush_worker(); + flush_worker(sender()); receiver().connect(&sender(), get_ep_params()); send_recv(receiver().ep(), sender().worker(), 1, 1); - receiver().flush_worker(); + flush_worker(receiver()); } UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_errh_peer) diff --git a/test/gtest/ucp/ucp_test.cc b/test/gtest/ucp/ucp_test.cc index 2fff41ef333a..da13afb67a69 100644 --- a/test/gtest/ucp/ucp_test.cc +++ b/test/gtest/ucp/ucp_test.cc @@ -113,9 +113,21 @@ void ucp_test::short_progress_loop(int worker_index) const { } } +void ucp_test::flush_ep(const entity &e, int worker_index, int ep_index) +{ + void *request = e.flush_ep_nb(worker_index, ep_index); + wait(request, worker_index); +} + +void ucp_test::flush_worker(const entity &e, int worker_index) +{ + void *request = e.flush_worker_nb(worker_index); + wait(request, worker_index); +} + void ucp_test::disconnect(const entity& entity) { for (int i = 0; i < entity.get_num_workers(); i++) { - entity.flush_worker(i); + flush_worker(entity, i); void *dreq = entity.disconnect_nb(i); if (!UCS_PTR_IS_PTR(dreq)) { ASSERT_UCS_OK(UCS_PTR_STATUS(dreq)); @@ -355,17 +367,18 @@ void ucp_test_base::entity::connect(const entity* other, } } -void ucp_test_base::entity::flush_worker(int worker_index) const { - if (worker(worker_index) == NULL) { - return; - } - ucs_status_t status = ucp_worker_flush(worker(worker_index)); - ASSERT_UCS_OK(status); +void ucp_test_base::entity::empty_send_completion(void *r, ucs_status_t status) { } -void ucp_test_base::entity::flush_ep(int worker_index, int ep_index) const { - ucs_status_t status = ucp_ep_flush(ep(worker_index, ep_index)); - ASSERT_UCS_OK(status); +void* ucp_test_base::entity::flush_ep_nb(int worker_index, int ep_index) const { + return ucp_ep_flush_nb(ep(worker_index, ep_index), 0, empty_send_completion); +} + +void* ucp_test_base::entity::flush_worker_nb(int worker_index) const { + if (worker(worker_index) == NULL) { + return NULL; + } + return ucp_worker_flush_nb(worker(worker_index), 0, empty_send_completion); } void ucp_test_base::entity::fence(int worker_index) const { @@ -373,11 +386,6 @@ void ucp_test_base::entity::fence(int worker_index) const { ASSERT_UCS_OK(status); } -void ucp_test_base::entity::disconnect(int worker_index, int ep_index) { - flush_ep(worker_index, ep_index); - m_workers[worker_index].second[ep_index].reset(); -} - void* ucp_test_base::entity::disconnect_nb(int worker_index, int ep_index) const { ucp_ep_h ep = revoke_ep(worker_index, ep_index); if (ep == NULL) { diff --git a/test/gtest/ucp/ucp_test.h b/test/gtest/ucp/ucp_test.h index c7cc016b1ac5..8b4bfa2458e0 100644 --- a/test/gtest/ucp/ucp_test.h +++ b/test/gtest/ucp/ucp_test.h @@ -49,14 +49,12 @@ class ucp_test_base : public ucs::test_base { void connect(const entity* other, const ucp_ep_params_t& ep_params, int ep_idx = 0); - void flush_ep(int worker_index = 0, int ep_index = 0) const; + void *flush_ep_nb(int worker_index = 0, int ep_index = 0) const; - void flush_worker(int worker_index = 0) const; + void* flush_worker_nb(int worker_index = 0) const; void fence(int worker_index = 0) const; - void disconnect(int worker_index = 0, int ep_index = 0); - void* disconnect_nb(int worker_index = 0, int ep_index = 0) const; void destroy_worker(int worker_index = 0); @@ -76,11 +74,15 @@ class ucp_test_base : public ucs::test_base { void cleanup(); static void ep_destructor(ucp_ep_h ep, entity *e); + protected: ucs::handle m_ucph; worker_vec_t m_workers; int num_workers; + + private: + static void empty_send_completion(void *r, ucs_status_t status); }; }; @@ -137,6 +139,8 @@ class ucp_test : public ucp_test_base, unsigned progress(int worker_index = 0) const; void short_progress_loop(int worker_index = 0) const; + void flush_ep(const entity &e, int worker_index = 0, int ep_index = 0); + void flush_worker(const entity &e, int worker_index = 0); void disconnect(const entity& entity); void wait(void *req, int worker_index = 0); void set_ucp_config(ucp_config_t *config);