Skip to content

Commit

Permalink
Merge pull request openucx#5963 from Artemy-Mellanox/topic/flush_canc…
Browse files Browse the repository at this point in the history
…el_nb-10

UCP/EP/CLOSE: Make close EP discard lanes directly
  • Loading branch information
yosefe authored Nov 30, 2020
2 parents 36ed228 + 44c166c commit c786a06
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 98 deletions.
113 changes: 92 additions & 21 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -948,41 +948,112 @@ ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode)
return ucp_ep_close_nbx(ep, &param);
}

static uct_iface_t ucp_failed_tl_iface = {
.ops = {
.ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_bc_ep_timeout,
.ep_put_zcopy = (uct_ep_put_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_get_short = (uct_ep_get_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_get_zcopy = (uct_ep_get_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_am_bcopy = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_bc_ep_timeout,
.ep_am_zcopy = (uct_ep_am_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_eager_short = (uct_ep_tag_eager_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_eager_bcopy = (uct_ep_tag_eager_bcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_eager_zcopy = (uct_ep_tag_eager_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_rndv_zcopy = (uct_ep_tag_rndv_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_rndv_cancel = (uct_ep_tag_rndv_cancel_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_rndv_request = (uct_ep_tag_rndv_request_func_t)ucs_empty_function_return_ep_timeout,
.ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy,
.ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function_return_success,
.ep_flush = (uct_ep_flush_func_t)ucs_empty_function_return_ep_timeout,
.ep_fence = (uct_ep_fence_func_t)ucs_empty_function_return_ep_timeout,
.ep_check = (uct_ep_check_func_t)ucs_empty_function_return_ep_timeout,
.ep_connect_to_ep = (uct_ep_connect_to_ep_func_t)ucs_empty_function_return_ep_timeout,
.ep_destroy = (uct_ep_destroy_func_t)ucs_empty_function,
.ep_get_address = (uct_ep_get_address_func_t)ucs_empty_function_return_ep_timeout
}
};

static uct_ep_t ucp_failed_tl_ep = {
.iface = &ucp_failed_tl_iface
};

void ucp_ep_discard_lanes(ucp_ep_h ep, ucs_status_t status)
{
ucp_lane_index_t lane;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (ep->uct_eps[lane] == NULL) {
continue;
}

ucs_trace("ep %p: discard uct_ep[%d]=%p", ep, lane,
ep->uct_eps[lane]);
ucp_worker_discard_uct_ep(ep->worker, ep->uct_eps[lane],
UCT_FLUSH_FLAG_CANCEL,
ucp_ep_err_pending_purge,
UCS_STATUS_PTR(status));
/* UCT CM lane mustn't be scheduled on worker progress when discarding,
* since UCP EP will be destroyed due to peer failure and
* ucp_cm_disconnect_cb() could be invoked on async thread after UCP EP
* is destroyed and before UCT CM EP is destroyed from discarding
* functionality. So, UCP EP will passed as a corrupted argument to
* ucp_cm_disconnect_cb() */
if (lane == ucp_ep_get_cm_lane(ep)) {
ucs_assert(!ucp_worker_is_uct_ep_discarding(ep->worker,
ep->uct_eps[lane]));
}
ep->uct_eps[lane] = &ucp_failed_tl_ep;
}
}

ucs_status_ptr_t ucp_ep_close_nbx(ucp_ep_h ep, const ucp_request_param_t *param)
{
ucp_worker_h worker = ep->worker;
int force;
void *request;
void *request = NULL;
ucp_request_t *close_req;
unsigned uct_flags;

force = ucp_request_param_flags(param) & UCP_EP_CLOSE_FLAG_FORCE;
if (force && !ucp_ep_has_cm_lane(ep) &&
if ((ucp_request_param_flags(param) & UCP_EP_CLOSE_FLAG_FORCE) &&
!ucp_ep_has_cm_lane(ep) &&
(ucp_ep_config(ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER)) {
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM);
}

UCS_ASYNC_BLOCK(&worker->async);

ep->flags |= UCP_EP_FLAG_CLOSED;
uct_flags = force ? UCT_FLUSH_FLAG_CANCEL : UCT_FLUSH_FLAG_LOCAL;
request = ucp_ep_flush_internal(ep, uct_flags, 0,
&ucp_request_null_param, NULL,
ucp_ep_close_flushed_callback,
"close");
if (!UCS_PTR_IS_PTR(request)) {
if (ucp_ep_is_cm_local_connected(ep) && !force) {
/* lanes already flushed, start disconnect on CM lane */
ucp_ep_cm_disconnect_cm_lane(ep);
close_req = ucp_ep_cm_close_request_get(ep);
if (close_req != NULL) {
request = close_req + 1;
ucp_ep_set_close_request(ep, close_req, "close");

if (ucp_request_param_flags(param) & UCP_EP_CLOSE_FLAG_FORCE) {
if (!(ep->flags & UCP_EP_FLAG_FAILED)) {
ucp_ep_discard_lanes(ep, UCS_ERR_CANCELED);
}

ucp_ep_disconnected(ep, 1);
} else {
request = ucp_ep_flush_internal(ep, 0, &ucp_request_null_param, NULL,
ucp_ep_close_flushed_callback, "close");
if (!UCS_PTR_IS_PTR(request)) {
if (ucp_ep_is_cm_local_connected(ep)) {
/* lanes already flushed, start disconnect on CM lane */
ucp_ep_cm_disconnect_cm_lane(ep);
close_req = ucp_ep_cm_close_request_get(ep);
if (close_req != NULL) {
request = close_req + 1;
ucp_ep_set_close_request(ep, close_req, "close");
} else {
request = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
}
} else {
request = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
ucp_ep_disconnected(ep, 0);
}
} else {
ucp_ep_disconnected(ep, force);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker,
const ucp_conn_request_h conn_request,
ucp_ep_h *ep_p);

ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
unsigned req_flags,
ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
const ucp_request_param_t *param,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
Expand Down Expand Up @@ -595,6 +594,8 @@ void ucp_ep_flush_completion(uct_completion_t *self);

void ucp_ep_flush_request_ff(ucp_request_t *req, ucs_status_t status);

void ucp_ep_discard_lanes(ucp_ep_h ucp_ep, ucs_status_t status);

/**
* @brief Do keepalive operation.
*
Expand Down
64 changes: 1 addition & 63 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,51 +423,12 @@ void ucp_worker_signal_internal(ucp_worker_h worker)
}
}

static uct_iface_t ucp_failed_tl_iface = {
.ops = {
.ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_bc_ep_timeout,
.ep_put_zcopy = (uct_ep_put_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_get_short = (uct_ep_get_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_get_zcopy = (uct_ep_get_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_am_bcopy = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_bc_ep_timeout,
.ep_am_zcopy = (uct_ep_am_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_ep_timeout,
.ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_eager_short = (uct_ep_tag_eager_short_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_eager_bcopy = (uct_ep_tag_eager_bcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_eager_zcopy = (uct_ep_tag_eager_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_rndv_zcopy = (uct_ep_tag_rndv_zcopy_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_rndv_cancel = (uct_ep_tag_rndv_cancel_func_t)ucs_empty_function_return_ep_timeout,
.ep_tag_rndv_request = (uct_ep_tag_rndv_request_func_t)ucs_empty_function_return_ep_timeout,
.ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy,
.ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function_return_success,
.ep_flush = (uct_ep_flush_func_t)ucs_empty_function_return_ep_timeout,
.ep_fence = (uct_ep_fence_func_t)ucs_empty_function_return_ep_timeout,
.ep_check = (uct_ep_check_func_t)ucs_empty_function_return_ep_timeout,
.ep_connect_to_ep = (uct_ep_connect_to_ep_func_t)ucs_empty_function_return_ep_timeout,
.ep_destroy = (uct_ep_destroy_func_t)ucs_empty_function,
.ep_get_address = (uct_ep_get_address_func_t)ucs_empty_function_return_ep_timeout
}
};

static uct_ep_t ucp_failed_tl_ep = {
.iface = &ucp_failed_tl_iface
};

static unsigned ucp_worker_iface_err_handle_progress(void *arg)
{
ucp_worker_err_handle_arg_t *err_handle_arg = arg;
ucp_ep_h ucp_ep = err_handle_arg->ucp_ep;
ucs_status_t status = err_handle_arg->status;
ucp_worker_h worker = ucp_ep->worker;
ucp_lane_index_t lane;
ucp_request_t *close_req;

UCS_ASYNC_BLOCK(&worker->async);
Expand All @@ -476,30 +437,7 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg)

ucs_assert(ucp_ep->flags & UCP_EP_FLAG_FAILED);

for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
if (ucp_ep->uct_eps[lane] == NULL) {
continue;
}

ucs_trace("ep %p: discard uct_ep[%d]=%p", ucp_ep, lane,
ucp_ep->uct_eps[lane]);
ucp_worker_discard_uct_ep(ucp_ep->worker, ucp_ep->uct_eps[lane],
UCT_FLUSH_FLAG_CANCEL,
ucp_ep_err_pending_purge,
UCS_STATUS_PTR(status));
/* UCT CM lane mustn't be scheduled on worker progress when discarding,
* since UCP EP will be destroyed due to peer failure and
* ucp_cm_disconnect_cb() could be invoked on async thread after UCP EP
* is destroyed and before UCT CM EP is destroyed from discarding
* functionality. So, UCP EP will passed as a corrupted argument to
* ucp_cm_disconnect_cb() */
if (lane == ucp_ep_get_cm_lane(ucp_ep)) {
ucs_assert(!ucp_worker_is_uct_ep_discarding(worker,
ucp_ep->uct_eps[lane]));
}
ucp_ep->uct_eps[lane] = &ucp_failed_tl_ep;
}

ucp_ep_discard_lanes(ucp_ep, status);
ucp_stream_ep_cleanup(ucp_ep);
if (ucp_ep->flags & UCP_EP_FLAG_USED) {
if (ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID) {
Expand Down
16 changes: 6 additions & 10 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ void ucp_ep_flush_remote_completed(ucp_request_t *req)
}
}

ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
unsigned req_flags,
ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
const ucp_request_param_t *param,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
Expand Down Expand Up @@ -336,7 +335,7 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
req->send.ep = ep;
req->send.flush.flushed_cb = flushed_cb;
req->send.flush.prog_id = UCS_CALLBACKQ_ID_NULL;
req->send.flush.uct_flags = uct_flags;
req->send.flush.uct_flags = UCT_FLUSH_FLAG_LOCAL;
req->send.flush.sw_started = 0;
req->send.flush.sw_done = 0;
req->send.flush.num_lanes = ucp_ep_num_lanes(ep);
Expand Down Expand Up @@ -387,9 +386,8 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_ep_flush_nbx, (ep, param),

UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, 0, param,
NULL, ucp_ep_flushed_callback,
"flush_nbx");
request = ucp_ep_flush_internal(ep, 0, param, NULL,
ucp_ep_flushed_callback, "flush_nbx");

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);

Expand Down Expand Up @@ -485,8 +483,7 @@ static unsigned ucp_worker_flush_progress(void *arg)
req->flush_worker.next_ep = ucs_list_next(&next_ep->ep_list,
ucp_ep_ext_gen_t, ep_list);

ep_flush_request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL,
UCP_REQUEST_FLAG_RELEASED,
ep_flush_request = ucp_ep_flush_internal(ep, UCP_REQUEST_FLAG_RELEASED,
&ucp_request_null_param, req,
ucp_worker_flush_ep_flushed_cb,
"flush_worker");
Expand Down Expand Up @@ -590,8 +587,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep)

UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, 0,
&ucp_request_null_param, NULL,
request = ucp_ep_flush_internal(ep, 0, &ucp_request_null_param, NULL,
ucp_ep_flushed_callback, "flush");
status = ucp_flush_wait(ep->worker, request);

Expand Down
3 changes: 1 addition & 2 deletions src/ucp/wireup/wireup_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,7 @@ static unsigned ucp_ep_cm_remote_disconnect_progress(void *arg)
*/
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID);
ucs_assert(!(ucp_ep->flags & UCP_EP_FLAG_CLOSED));
req = ucp_ep_flush_internal(ucp_ep, UCT_FLUSH_FLAG_LOCAL, 0,
&ucp_request_null_param, NULL,
req = ucp_ep_flush_internal(ucp_ep, 0, &ucp_request_null_param, NULL,
ucp_ep_cm_disconnect_flushed_cb,
"cm_disconnected_cb");
if (req == NULL) {
Expand Down

0 comments on commit c786a06

Please sign in to comment.