Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/API/TEST: Add non-blocking endpoint flush and use it for RMA tests #1900

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ libucp_la_SOURCES = \
dt/dt.c \
proto/proto_am.c \
rma/basic_rma.c \
rma/flush.c \
tag/eager_rcv.c \
tag/eager_snd.c \
tag/probe.c \
Expand Down
27 changes: 19 additions & 8 deletions src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1588,19 +1588,30 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream);
/**
* @ingroup UCP_ENDPOINT
*
* @brief Flush outstanding AMO and RMA operations on the @ref ucp_ep_h
* "endpoint".
* @brief Non-blocking flush of outstanding AMO and RMA operations on the
* @ref ucp_ep_h "endpoint".
*
* This routine flushes all outstanding AMO and RMA communications on the
* @ref ucp_ep_h "endpoint". All the AMO and RMA operations issued on the
* @a ep prior to this call are completed both at the origin and at the target
* @ref ucp_ep_h "endpoint" when this call returns.
*
* @param [in] ep UCP endpoint.
*
* @return Error code as defined by @ref ucs_status_t
*/
ucs_status_t ucp_ep_flush(ucp_ep_h ep);
* @param [in] flags Flags for flush operation. Reserved for future use.
* @param [in] cb Callback which will be called when the flush operation
* completes.
*
* @return UCS_OK - The flush operation was completed immediately.
* @return UCS_PTR_IS_ERR(_ptr) - The flush operation failed.
* @return otherwise - Flush operation was scheduled and can be completed
* in any point in time. The request handle is returned
* to the application in order to track progress. The
* application is responsible to release the handle
* using @ref ucp_request_free "ucp_request_free()"
* routine.
*/
ucs_status_ptr_t ucp_ep_flush_nb(ucp_ep_h ep, unsigned flags,
ucp_send_callback_t cb);


/**
Expand Down Expand Up @@ -2645,8 +2656,8 @@ ucs_status_t ucp_atomic_cswap64(ucp_ep_h ep, uint64_t compare, uint64_t swap,
* memory address @a remote_addr and the @ref ucp_rkey_h "remote memory handle"
* @a rkey.
* Return from the function does not guarantee completion. A user must
* call @ref ucp_ep_flush or @ref ucp_worker_flush to guarentee that the remote
* value has been updated.
* call @ref ucp_ep_flush_nb or @ref ucp_worker_flush to guarantee that the
* remote value has been updated.
*
* @param [in] ep UCP endpoint.
* @param [in] opcode One of @ref ucp_atomic_post_op_t.
Expand Down
7 changes: 7 additions & 0 deletions src/ucp/api/ucp_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@ ucs_status_ptr_t ucp_disconnect_nb(ucp_ep_h ep);
ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info);


/**
* @ingroup UCP_ENDPOINT
* @deprecated Replaced by @ref ucp_ep_flush_nb.
*/
ucs_status_t ucp_ep_flush(ucp_ep_h ep);


#endif
265 changes: 28 additions & 237 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,188 +302,6 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker,
return status;
}

static void ucp_ep_flush_error(ucp_request_t *req, ucs_status_t status)
{
if (ucp_ep_config(req->send.ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER) {
ucs_error("error during flush: %s", ucs_status_string(status));
}

req->status = status;
--req->send.uct_comp.count;
}

static void ucp_ep_flush_progress(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
ucp_lane_index_t lane;
ucs_status_t status;
uct_ep_h uct_ep;

ucs_trace("ep %p: progress flush req %p, lanes 0x%x count %d", ep, req,
req->send.flush.lanes, req->send.uct_comp.count);

while (req->send.flush.lanes) {

/* Search for next lane to start flush */
lane = ucs_ffs64(req->send.flush.lanes);
uct_ep = ep->uct_eps[lane];
if (uct_ep == NULL) {
req->send.flush.lanes &= ~UCS_BIT(lane);
--req->send.uct_comp.count;
continue;
}

/* Start flush operation on UCT endpoint */
if (req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) {
uct_ep_pending_purge(uct_ep, ucp_ep_err_pending_purge,
UCS_STATUS_PTR(UCS_ERR_CANCELED));
}
status = uct_ep_flush(uct_ep, req->send.flush.uct_flags,
&req->send.uct_comp);
ucs_trace("flushing ep %p lane[%d]: %s", ep, lane,
ucs_status_string(status));
if (status == UCS_OK) {
req->send.flush.lanes &= ~UCS_BIT(lane);
--req->send.uct_comp.count;
} else if (status == UCS_INPROGRESS) {
req->send.flush.lanes &= ~UCS_BIT(lane);
} else if (status == UCS_ERR_NO_RESOURCE) {
if (req->send.lane != UCP_NULL_LANE) {
ucs_trace("ep %p: not adding pending flush %p on lane %d, "
"because it's already pending on lane %d",
ep, req, lane, req->send.lane);
break;
}

status = uct_ep_pending_add(uct_ep, &req->send.uct);
ucs_trace("adding pending flush on ep %p lane[%d]: %s", ep, lane,
ucs_status_string(status));
if (status == UCS_OK) {
req->send.lane = lane;
req->send.flush.lanes &= ~UCS_BIT(lane);
} else if (status != UCS_ERR_BUSY) {
ucp_ep_flush_error(req, status);
break;
}
} else {
ucp_ep_flush_error(req, status);
break;
}
}
}

static void ucp_ep_flush_slow_path_remove(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
uct_worker_progress_unregister_safe(ep->worker->uct,
&req->send.flush.slow_cb_id);
}

static unsigned ucp_ep_flushed_slow_path_callback(void *arg)
{
ucp_request_t *req = arg;
ucp_ep_h ep = req->send.ep;

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));

ucs_trace("flush req %p ep %p remove from uct_worker %p", req, ep,
ep->worker->uct);
ucp_ep_flush_slow_path_remove(req);
req->send.flush.flushed_cb(req);

/* Complete send request from here, to avoid releasing the request while
* slow-path element is still pending */
ucp_request_complete_send(req, req->status);

return 0;
}

static int ucp_flush_check_completion(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;

/* Check if flushed all lanes */
if (req->send.uct_comp.count != 0) {
return 0;
}

ucs_trace("adding slow-path callback to destroy ep %p", ep);
ucp_ep_flush_slow_path_remove(req);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flushed_slow_path_callback, req, 0,
&req->send.flush.slow_cb_id);
return 1;
}

static unsigned ucp_ep_flush_resume_slow_path_callback(void *arg)
{
ucp_request_t *req = arg;

ucp_ep_flush_slow_path_remove(req);
ucp_ep_flush_progress(req);
ucp_flush_check_completion(req);
return 0;
}

static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_lane_index_t lane = req->send.lane;
ucp_ep_h ep = req->send.ep;
ucs_status_t status;
int completed;

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));

status = uct_ep_flush(ep->uct_eps[lane], req->send.flush.uct_flags,
&req->send.uct_comp);
ucs_trace("flushing ep %p lane[%d]: %s", ep, lane,
ucs_status_string(status));
if (status == UCS_OK) {
--req->send.uct_comp.count; /* UCT endpoint is flushed */
}

/* since req->flush.pend.lane is still non-NULL, this function will not
* put anything on pending.
*/
ucp_ep_flush_progress(req);
completed = ucp_flush_check_completion(req);

/* If the operation has not completed, add slow-path progress to resume */
if (!completed && req->send.flush.lanes) {
ucs_trace("ep %p: adding slow-path callback to resume flush", ep);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flush_resume_slow_path_callback,
req, 0, &req->send.flush.slow_cb_id);
}

if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
req->send.lane = UCP_NULL_LANE;
return UCS_OK;
} else if (status == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
} else {
ucp_ep_flush_error(req, status);
return UCS_OK;
}
}

static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp);

ucs_trace("flush completion req=%p status=%d", req, status);

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));

if (status == UCS_OK) {
req->status = status;
}

ucp_ep_flush_progress(req);
ucp_flush_check_completion(req);
}

void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
Expand Down Expand Up @@ -546,6 +364,8 @@ static void ucp_ep_disconnected(ucp_ep_h ep)
{
ucp_recv_desc_t *rdesc;

ucs_trace("ep %p is disconnected", ep);

while (!ucs_queue_is_empty(&ep->stream_data)) {
rdesc = ucs_queue_pull_elem_non_empty(&ep->stream_data, ucp_recv_desc_t,
stream_queue);
Expand All @@ -571,67 +391,30 @@ static void ucp_ep_disconnected(ucp_ep_h ep)
ucp_ep_destroy_internal(ep, " from disconnect");
}

static void ucp_ep_flushed_callback(ucp_request_t *req)
static unsigned ucp_ep_do_disconnect(void *arg)
{
ucp_ep_disconnected(req->send.ep);
}
ucp_request_t *req = arg;

static ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode)
{
ucs_status_t status;
ucp_request_t *req;
ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));

ucs_debug("disconnect ep %p", ep);
ucp_ep_disconnected(req->send.ep);

if (ep->flags & UCP_EP_FLAG_FAILED) {
ucp_ep_disconnected(ep);
return NULL;
}
/* Complete send request from here, to avoid releasing the request while
* slow-path element is still pending */
ucp_request_complete_send(req, req->status);

req = ucs_mpool_get(&ep->worker->req_mp);
if (req == NULL) {
return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
}
return 0;
}

/*
* Flush operation can be queued on the pending queue of only one of the
* lanes (indicated by req->send.lane) and scheduled for completion on any
* number of lanes. req->send.uct_comp.count keeps track of how many lanes
* are not flushed yet, and when it reaches zero, it means all lanes are
* flushed. req->send.flush.lanes keeps track of which lanes we still have
* to start flush on.
* If a flush is completed from a pending/completion callback, we need to
* schedule slow-path callback to release the endpoint later, since a UCT
* endpoint cannot be released from pending/completion callback context.
*/
req->flags = 0;
req->status = UCS_OK;
req->send.ep = ep;
req->send.flush.flushed_cb = ucp_ep_flushed_callback;
req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep));
req->send.flush.slow_cb_id = UCS_CALLBACKQ_ID_NULL;
req->send.flush.uct_flags = (mode == UCP_EP_CLOSE_MODE_FLUSH) ?
UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL;

req->send.lane = UCP_NULL_LANE;
req->send.uct.func = ucp_ep_flush_progress_pending;
req->send.uct_comp.func = ucp_ep_flush_completion;
req->send.uct_comp.count = ucp_ep_num_lanes(ep);

ucp_ep_flush_progress(req);

if (req->send.uct_comp.count == 0) {
status = req->status;
ucs_trace_req("ep %p: releasing flush request %p, returning status %s",
ep, req, ucs_status_string(status));
ucs_mpool_put(req);
ucp_ep_disconnected(ep);
return UCS_STATUS_PTR(status);
}
static void ucp_ep_close_flushed_callback(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;

ucs_trace_req("ep %p: return inprogress flush request %p (%p)", ep, req,
req + 1);
return req + 1;
ucs_trace("adding slow-path callback to destroy ep %p", ep);
req->send.disconnect.slow_cb_id = UCS_CALLBACKQ_ID_NULL;
uct_worker_progress_register_safe(ep->worker->uct, ucp_ep_do_disconnect,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&req->send.disconnect.slow_cb_id);
}

ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode)
Expand All @@ -647,7 +430,15 @@ ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode)
UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock);

UCS_ASYNC_BLOCK(&worker->async);
request = ucp_disconnect_nb_internal(ep, mode);
request = ucp_ep_flush_internal(ep,
(mode == UCP_EP_CLOSE_MODE_FLUSH) ?
UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL,
NULL, 0,
ucp_ep_close_flushed_callback);
if (!UCS_PTR_IS_PTR(request)) {
ucp_ep_disconnected(ep);
}

UCS_ASYNC_UNBLOCK(&worker->async);

UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock);
Expand Down
5 changes: 5 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
const char *peer_name, const char *message,
ucp_ep_h *ep_p);

ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
ucp_send_callback_t req_cb,
unsigned req_flags,
ucp_request_callback_t flushed_cb);

ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid,
const char *message, ucp_ep_h *ep_p);

Expand Down
Loading