diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index 47afcc81c0c..fbce4986976 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -136,6 +136,7 @@ static UCS_F_ALWAYS_INLINE ucp_ep_flush_state_t* ucp_ep_flush_state(ucp_ep_h ep) { ucs_assert(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID); ucs_assert(!(ep->flags & UCP_EP_FLAG_ON_MATCH_CTX)); + ucs_assert(!(ep->flags & UCP_EP_FLAG_LISTENER)); return &ucp_ep_ext_gen(ep)->flush_state; } diff --git a/src/ucp/rma/flush.c b/src/ucp/rma/flush.c index 418e5171798..05c8ceabcf0 100644 --- a/src/ucp/rma/flush.c +++ b/src/ucp/rma/flush.c @@ -89,19 +89,31 @@ static void ucp_ep_flush_progress(ucp_request_t *req) } if (!req->send.flush.sw_started && (req->send.state.uct_comp.count == 0)) { - /* we start the SW flush only after all lanes are flushed, so we are sure - * all our requests were sent, to get the right "num_sent" counter + /* Start waiting for remote completions only after all lanes are flushed + * on the transport level, so we are sure all pending requests were sent. + * We don't need to wait for remote completions in these cases: + * - The flush operation is in 'cancel' mode + * - The endpoint is either not used or did not resolve the peer endpoint, + * which means we didn't have any user operations which require remote + * completion. In this case, the flush state may not even be initialized. */ - flush_state = (ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID) ? - ucp_ep_flush_state(ep) : NULL; - if ((flush_state == NULL) || (flush_state->send_sn == flush_state->cmpl_sn)) { + if ((req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) || + !ucs_test_all_flags(ep->flags, UCP_EP_FLAG_USED|UCP_EP_FLAG_DEST_EP)) { + ucs_trace_req("flush request %p not waiting for remote completions", + req); req->send.flush.sw_done = 1; - ucs_trace_req("flush request %p remote completions done", req); } else { - req->send.flush.cmpl_sn = flush_state->send_sn; - ucs_queue_push(&flush_state->reqs, &req->send.flush.queue); - ucs_trace_req("added flush request %p to ep remote completion queue" - " with sn %d", req, req->send.flush.cmpl_sn); + /* All pending requests were sent, so 'send_sn' value is up-to-date */ + flush_state = ucp_ep_flush_state(ep); + if (flush_state->send_sn == flush_state->cmpl_sn) { + req->send.flush.sw_done = 1; + ucs_trace_req("flush request %p remote completions done", req); + } else { + req->send.flush.cmpl_sn = flush_state->send_sn; + ucs_queue_push(&flush_state->reqs, &req->send.flush.queue); + ucs_trace_req("added flush request %p to ep remote completion queue" + " with sn %d", req, req->send.flush.cmpl_sn); + } } req->send.flush.sw_started = 1; } @@ -381,8 +393,9 @@ static unsigned ucp_worker_flush_progress(void *arg) req->flush_worker.next_ep = ucs_list_next(&next_ep->ep_list, ucp_ep_ext_gen_t, ep_list); - ep_flush_request = ucp_ep_flush_internal(ep, 0, NULL, UCP_REQUEST_FLAG_RELEASED, - req, ucp_worker_flush_ep_flushed_cb, + ep_flush_request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL, + UCP_REQUEST_FLAG_RELEASED, req, + ucp_worker_flush_ep_flushed_cb, "flush_worker"); if (UCS_PTR_IS_ERR(ep_flush_request)) { /* endpoint flush resulted in an error */ diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 6cf71b2dc2a..56bcec85cc3 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -308,8 +308,7 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, /* wireup request for a specific ep */ ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr); ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr); - if (!(ep->flags & UCP_EP_FLAG_LISTENER) && - ucp_ep_config(ep)->p2p_lanes) { + if (!(ep->flags & UCP_EP_FLAG_LISTENER)) { /* Reset flush state only if it's not a client-server wireup on * server side with long address exchange when listener (united with * flush state) should be valid until user's callback invoking */ diff --git a/src/uct/sm/mm/mm_ep.c b/src/uct/sm/mm/mm_ep.c index 2c66d387802..5d369934866 100644 --- a/src/uct/sm/mm/mm_ep.c +++ b/src/uct/sm/mm/mm_ep.c @@ -419,10 +419,15 @@ ucs_status_t uct_mm_ep_flush(uct_ep_h tl_ep, unsigned flags, { uct_mm_ep_t *ep = ucs_derived_of(tl_ep, uct_mm_ep_t); - uct_mm_ep_update_cached_tail(ep); - if (!uct_mm_ep_has_tx_resources(ep)) { - return UCS_ERR_NO_RESOURCE; + if (!ucs_arbiter_group_is_empty(&ep->arb_group)) { + return UCS_ERR_NO_RESOURCE; + } else { + uct_mm_ep_update_cached_tail(ep); + if (!uct_mm_ep_has_tx_resources(ep)) { + return UCS_ERR_NO_RESOURCE; + } + } } ucs_memory_cpu_store_fence(); diff --git a/test/gtest/ucs/test_stats.cc b/test/gtest/ucs/test_stats.cc index 9dfc3693a64..d29dfa05cbd 100644 --- a/test/gtest/ucs/test_stats.cc +++ b/test/gtest/ucs/test_stats.cc @@ -57,7 +57,7 @@ class stats_test : public ucs::test { void prepare_nodes(ucs_stats_node_t **cat_node, ucs_stats_node_t *data_nodes[NUM_DATA_NODES]) { static ucs_stats_class_t category_stats_class = { - "category", 0, {} + "category", 0 }; ucs_status_t status = UCS_STATS_NODE_ALLOC(cat_node, @@ -257,7 +257,7 @@ UCS_TEST_F(stats_on_demand_test, null_root) { ucs_stats_node_t *cat_node; static ucs_stats_class_t category_stats_class = { - "category", 0, {} + "category", 0 }; ucs_status_t status = UCS_STATS_NODE_ALLOC(&cat_node, &category_stats_class, NULL); diff --git a/test/gtest/ucs/test_stats_filter.cc b/test/gtest/ucs/test_stats_filter.cc index acaa9bad8c0..9dcfa20028d 100644 --- a/test/gtest/ucs/test_stats_filter.cc +++ b/test/gtest/ucs/test_stats_filter.cc @@ -63,7 +63,7 @@ class stats_filter_test : public ucs::test { void prepare_nodes() { static ucs_stats_class_t category_stats_class = { - "category", 0, {} + "category", 0 }; ucs_status_t status = UCS_STATS_NODE_ALLOC(&cat_node, &category_stats_class,