Skip to content

Commit

Permalink
UCP/EP: Fixes for endpoint flush for software rma/amo operations
Browse files Browse the repository at this point in the history
- Ignore remote completions in case of forced flush (CLOSE_MODE_CANCEL)

- The UCP_EP_FLAG_FLUSH_STATE_VALID flag can't be used because it's not
  valid in release mode. Instead, use the UCP_EP_FLAG_DEST_EP flag as an
  indirect indication that we may have some operations which wait for
  software rma/amo completion.

- Fix missing initializion of flush state in case of client/server
  connection establishment with p2p lanes and *without* pre-request.
  • Loading branch information
yosefe committed Nov 24, 2018
1 parent 1ac5745 commit 3592e83
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ static UCS_F_ALWAYS_INLINE ucp_ep_flush_state_t* ucp_ep_flush_state(ucp_ep_h ep)
{
ucs_assert(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID);
ucs_assert(!(ep->flags & UCP_EP_FLAG_ON_MATCH_CTX));
ucs_assert(!(ep->flags & UCP_EP_FLAG_LISTENER));
return &ucp_ep_ext_gen(ep)->flush_state;
}

Expand Down
37 changes: 25 additions & 12 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,31 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
}

if (!req->send.flush.sw_started && (req->send.state.uct_comp.count == 0)) {
/* we start the SW flush only after all lanes are flushed, so we are sure
* all our requests were sent, to get the right "num_sent" counter
/* Start waiting for remote completions only after all lanes are flushed
* on the transport level, so we are sure all pending requests were sent.
* We don't need to wait for remote completions in these cases:
* - The flush operation is in 'cancel' mode
* - The endpoint is either not used or did not resolve the peer endpoint,
* which means we didn't have any user operations which require remote
* completion. In this case, the flush state may not even be initialized.
*/
flush_state = (ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID) ?
ucp_ep_flush_state(ep) : NULL;
if ((flush_state == NULL) || (flush_state->send_sn == flush_state->cmpl_sn)) {
if ((req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) ||
!ucs_test_all_flags(ep->flags, UCP_EP_FLAG_USED|UCP_EP_FLAG_DEST_EP)) {
ucs_trace_req("flush request %p not waiting for remote completions",
req);
req->send.flush.sw_done = 1;
ucs_trace_req("flush request %p remote completions done", req);
} else {
req->send.flush.cmpl_sn = flush_state->send_sn;
ucs_queue_push(&flush_state->reqs, &req->send.flush.queue);
ucs_trace_req("added flush request %p to ep remote completion queue"
" with sn %d", req, req->send.flush.cmpl_sn);
/* All pending requires were sent, so 'send_sn' value is up-to-date */
flush_state = ucp_ep_flush_state(ep);
if (flush_state->send_sn == flush_state->cmpl_sn) {
req->send.flush.sw_done = 1;
ucs_trace_req("flush request %p remote completions done", req);
} else {
req->send.flush.cmpl_sn = flush_state->send_sn;
ucs_queue_push(&flush_state->reqs, &req->send.flush.queue);
ucs_trace_req("added flush request %p to ep remote completion queue"
" with sn %d", req, req->send.flush.cmpl_sn);
}
}
req->send.flush.sw_started = 1;
}
Expand Down Expand Up @@ -381,8 +393,9 @@ static unsigned ucp_worker_flush_progress(void *arg)
req->flush_worker.next_ep = ucs_list_next(&next_ep->ep_list,
ucp_ep_ext_gen_t, ep_list);

ep_flush_request = ucp_ep_flush_internal(ep, 0, NULL, UCP_REQUEST_FLAG_RELEASED,
req, ucp_worker_flush_ep_flushed_cb,
ep_flush_request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL,
UCP_REQUEST_FLAG_RELEASED, req,
ucp_worker_flush_ep_flushed_cb,
"flush_worker");
if (UCS_PTR_IS_ERR(ep_flush_request)) {
/* endpoint flush resulted in an error */
Expand Down
3 changes: 1 addition & 2 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
/* wireup request for a specific ep */
ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr);
ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr);
if (!(ep->flags & UCP_EP_FLAG_LISTENER) &&
ucp_ep_config(ep)->p2p_lanes) {
if (!(ep->flags & UCP_EP_FLAG_LISTENER)) {
/* Reset flush state only if it's not a client-server wireup on
* server side with long address exchange when listener (united with
* flush state) should be valid until user's callback invoking */
Expand Down

0 comments on commit 3592e83

Please sign in to comment.