Skip to content

Commit

Permalink
Merge pull request #3064 from yosefe/topic/flush-fixes-v1.5.x-2
Browse files Browse the repository at this point in the history
Fixes for endpoint flush - v1.5.x
  • Loading branch information
yosefe authored Nov 28, 2018
2 parents 136270c + 71c0811 commit 02078b9
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ static UCS_F_ALWAYS_INLINE ucp_ep_flush_state_t* ucp_ep_flush_state(ucp_ep_h ep)
{
ucs_assert(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID);
ucs_assert(!(ep->flags & UCP_EP_FLAG_ON_MATCH_CTX));
ucs_assert(!(ep->flags & UCP_EP_FLAG_LISTENER));
return &ucp_ep_ext_gen(ep)->flush_state;
}

Expand Down
37 changes: 25 additions & 12 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,31 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
}

if (!req->send.flush.sw_started && (req->send.state.uct_comp.count == 0)) {
/* we start the SW flush only after all lanes are flushed, so we are sure
* all our requests were sent, to get the right "num_sent" counter
/* Start waiting for remote completions only after all lanes are flushed
* on the transport level, so we are sure all pending requests were sent.
* We don't need to wait for remote completions in these cases:
* - The flush operation is in 'cancel' mode
* - The endpoint is either not used or did not resolve the peer endpoint,
* which means we didn't have any user operations which require remote
* completion. In this case, the flush state may not even be initialized.
*/
flush_state = (ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID) ?
ucp_ep_flush_state(ep) : NULL;
if ((flush_state == NULL) || (flush_state->send_sn == flush_state->cmpl_sn)) {
if ((req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) ||
!ucs_test_all_flags(ep->flags, UCP_EP_FLAG_USED|UCP_EP_FLAG_DEST_EP)) {
ucs_trace_req("flush request %p not waiting for remote completions",
req);
req->send.flush.sw_done = 1;
ucs_trace_req("flush request %p remote completions done", req);
} else {
req->send.flush.cmpl_sn = flush_state->send_sn;
ucs_queue_push(&flush_state->reqs, &req->send.flush.queue);
ucs_trace_req("added flush request %p to ep remote completion queue"
" with sn %d", req, req->send.flush.cmpl_sn);
/* All pending requests were sent, so 'send_sn' value is up-to-date */
flush_state = ucp_ep_flush_state(ep);
if (flush_state->send_sn == flush_state->cmpl_sn) {
req->send.flush.sw_done = 1;
ucs_trace_req("flush request %p remote completions done", req);
} else {
req->send.flush.cmpl_sn = flush_state->send_sn;
ucs_queue_push(&flush_state->reqs, &req->send.flush.queue);
ucs_trace_req("added flush request %p to ep remote completion queue"
" with sn %d", req, req->send.flush.cmpl_sn);
}
}
req->send.flush.sw_started = 1;
}
Expand Down Expand Up @@ -381,8 +393,9 @@ static unsigned ucp_worker_flush_progress(void *arg)
req->flush_worker.next_ep = ucs_list_next(&next_ep->ep_list,
ucp_ep_ext_gen_t, ep_list);

ep_flush_request = ucp_ep_flush_internal(ep, 0, NULL, UCP_REQUEST_FLAG_RELEASED,
req, ucp_worker_flush_ep_flushed_cb,
ep_flush_request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL,
UCP_REQUEST_FLAG_RELEASED, req,
ucp_worker_flush_ep_flushed_cb,
"flush_worker");
if (UCS_PTR_IS_ERR(ep_flush_request)) {
/* endpoint flush resulted in an error */
Expand Down
3 changes: 1 addition & 2 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
/* wireup request for a specific ep */
ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr);
ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr);
if (!(ep->flags & UCP_EP_FLAG_LISTENER) &&
ucp_ep_config(ep)->p2p_lanes) {
if (!(ep->flags & UCP_EP_FLAG_LISTENER)) {
/* Reset flush state only if it's not a client-server wireup on
* server side with long address exchange when listener (united with
* flush state) should be valid until user's callback invoking */
Expand Down
11 changes: 8 additions & 3 deletions src/uct/sm/mm/mm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,15 @@ ucs_status_t uct_mm_ep_flush(uct_ep_h tl_ep, unsigned flags,
{
uct_mm_ep_t *ep = ucs_derived_of(tl_ep, uct_mm_ep_t);

uct_mm_ep_update_cached_tail(ep);

if (!uct_mm_ep_has_tx_resources(ep)) {
return UCS_ERR_NO_RESOURCE;
if (!ucs_arbiter_group_is_empty(&ep->arb_group)) {
return UCS_ERR_NO_RESOURCE;
} else {
uct_mm_ep_update_cached_tail(ep);
if (!uct_mm_ep_has_tx_resources(ep)) {
return UCS_ERR_NO_RESOURCE;
}
}
}

ucs_memory_cpu_store_fence();
Expand Down
4 changes: 2 additions & 2 deletions test/gtest/ucs/test_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class stats_test : public ucs::test {
void prepare_nodes(ucs_stats_node_t **cat_node,
ucs_stats_node_t *data_nodes[NUM_DATA_NODES]) {
static ucs_stats_class_t category_stats_class = {
"category", 0, {}
"category", 0
};

ucs_status_t status = UCS_STATS_NODE_ALLOC(cat_node,
Expand Down Expand Up @@ -257,7 +257,7 @@ UCS_TEST_F(stats_on_demand_test, null_root) {
ucs_stats_node_t *cat_node;

static ucs_stats_class_t category_stats_class = {
"category", 0, {}
"category", 0
};
ucs_status_t status = UCS_STATS_NODE_ALLOC(&cat_node, &category_stats_class,
NULL);
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucs/test_stats_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class stats_filter_test : public ucs::test {

void prepare_nodes() {
static ucs_stats_class_t category_stats_class = {
"category", 0, {}
"category", 0
};

ucs_status_t status = UCS_STATS_NODE_ALLOC(&cat_node, &category_stats_class,
Expand Down

0 comments on commit 02078b9

Please sign in to comment.