Skip to content

Commit

Permalink
UCT/GTEST: Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Apr 12, 2021
1 parent 2f9a391 commit 5f00424
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 23 deletions.
21 changes: 11 additions & 10 deletions src/uct/tcp/tcp_sockcm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ void uct_tcp_sockcm_ep_handle_event_status(uct_tcp_sockcm_ep_t *ep,
((ep->state & UCT_TCP_SOCKCM_EP_ON_SERVER) ? "server" : "client"),
ep, ep->fd, ep->state, events, reason, ucs_status_string(status));

/* if the ep is on the server side but uct_ep_create wasn't called yet,
* destroy the ep here since uct_ep_destroy won't be called either */
/* if the ep is on the server side but uct_ep_create wasn't called yet and
* connection request wasn't prvodied to a user, destroy the ep here since
* uct_ep_destroy won't be called either */
if ((ep->state & (UCT_TCP_SOCKCM_EP_ON_SERVER |
UCT_TCP_SOCKCM_EP_SERVER_CREATED |
UCT_TCP_SOCKCM_EP_SERVER_CONN_REQ_CB_INVOKED)) ==
Expand Down Expand Up @@ -911,6 +912,13 @@ static ucs_status_t uct_tcp_sockcm_ep_server_create(uct_tcp_sockcm_ep_t *tcp_ep,
goto err;
}

UCS_ASYNC_BLOCK(async);

if (tcp_ep->state & UCT_TCP_SOCKCM_EP_FAILED) {
status = UCS_ERR_CONNECTION_RESET;
goto err_unblock;
}

/* check if the server opened this ep, to the client, on a CM that is
* different from the one it created its internal ep on earlier, when it
* received the connection request from the client (the cm used by its listener) */
Expand All @@ -919,17 +927,10 @@ static ucs_status_t uct_tcp_sockcm_ep_server_create(uct_tcp_sockcm_ep_t *tcp_ep,
if (status != UCS_OK) {
ucs_error("failed to remove fd %d from the async handlers: %s",
tcp_ep->fd, ucs_status_string(status));
goto err;
goto err_unblock;
}
}

UCS_ASYNC_BLOCK(async);

if (tcp_ep->state & UCT_TCP_SOCKCM_EP_FAILED) {
status = UCS_ERR_CONNECTION_RESET;
goto err_unblock;
}

/* set the server's ep to use the cm from params.
* (it could be the previous one it had - the one used by the listener or
* a new one set by the user) */
Expand Down
20 changes: 19 additions & 1 deletion test/gtest/ucp/test_ucp_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,8 @@ UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_sforce) {
one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
}

/* The test check that a client disconenction works fine when a server received
* a conenction request, but a conenction wasn't fully established */
UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, create_and_destroy_immediately)
{
ucp_test_base::entity::listen_cb_type_t listen_cb_type = cb_type();
Expand All @@ -1195,23 +1197,39 @@ UCS_TEST_P(test_ucp_sockaddr_destroy_ep_on_err, create_and_destroy_immediately)
client_ep_connect();

if (listen_cb_type == ucp_test_base::entity::LISTEN_CB_CONN) {
/* Wait for either connection to a peer failed (e.g. no TL to create
* after CM created a connection) or connection request is provided
* by UCP */
while ((m_err_count == 0) &&
receiver().is_conn_reqs_queue_empty()) {
progress();
}
} else {
/* Wait for EP being created on a server side */
ASSERT_EQ(ucp_test_base::entity::LISTEN_CB_EP, listen_cb_type);
if (!wait_for_server_ep(false)) {
UCS_TEST_SKIP_R("cannot connect to server");
}
}

/* Disconnect from a peer while conenction is not fully established with
* a peer */
one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE);
while ((m_err_count == 0) && (receiver().get_err_num() == 0)) {

/* Wait until either accepting a connection fails on a server side or
* disconnection is detected by a server in case of a connection was
* established successfully */
ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(10.0);
while ((ucs_get_time() < loop_end_limit) &&
(m_err_count == 0) && (receiver().get_accept_err_num() == 0)) {
progress();
}

EXPECT_TRUE((m_err_count != 0) ||
(receiver().get_accept_err_num() != 0));
}

/* Disconnect from a client if a connection was established */
one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
}

Expand Down
24 changes: 13 additions & 11 deletions test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ ucp_test_base::entity::entity(const ucp_test_param& test_param,
ucp_config_t* ucp_config,
const ucp_worker_params_t& worker_params,
const ucp_test_base *test_owner)
: m_err_cntr(0), m_rejected_cntr(0)
: m_err_cntr(0), m_rejected_cntr(0), m_accept_err_cntr(0)
{
const int thread_type = test_param.variant.thread_type;
ucp_params_t local_ctx_params = test_param.variant.ctx_params;
Expand Down Expand Up @@ -616,9 +616,10 @@ bool ucp_test_base::entity::verify_client_address(struct sockaddr_storage
return false;
}

ucp_ep_h ucp_test_base::entity::accept(ucp_worker_h worker,
ucp_conn_request_h conn_request)
void ucp_test_base::entity::accept(int worker_index,
ucp_conn_request_h conn_request)
{
ucp_worker_h ucp_worker = worker(worker_index);
ucp_ep_params_t ep_params = *m_server_ep_params;
ucp_conn_request_attr_t attr;
ucs_status_t status;
Expand All @@ -636,17 +637,17 @@ ucp_ep_h ucp_test_base::entity::accept(ucp_worker_h worker,
ep_params.user_data = reinterpret_cast<void*>(this);
ep_params.conn_request = conn_request;

status = ucp_ep_create(worker, &ep_params, &ep);
status = ucp_ep_create(ucp_worker, &ep_params, &ep);
if (status == UCS_ERR_UNREACHABLE) {
UCS_TEST_SKIP_R("Skipping due an unreachable destination (unsupported "
"feature or no supported transport to send partial "
"worker address)");
} else if (status != UCS_OK) {
add_err(status);
ep = NULL;
++m_accept_err_cntr;
return;
}

return ep;
set_ep(ep, worker_index, std::numeric_limits<int>::max());
}


Expand Down Expand Up @@ -879,10 +880,7 @@ unsigned ucp_test_base::entity::progress(int worker_index)
if (!m_conn_reqs.empty()) {
ucp_conn_request_h conn_req = m_conn_reqs.back();
m_conn_reqs.pop();
ucp_ep_h ep = accept(ucp_worker, conn_req);
if (ep != NULL) {
set_ep(ep, worker_index, std::numeric_limits<int>::max());
}
accept(worker_index, conn_req);
++progress_count;
}

Expand Down Expand Up @@ -917,6 +915,10 @@ const size_t &ucp_test_base::entity::get_err_num() const {
return m_err_cntr;
}

const size_t &ucp_test_base::entity::get_accept_err_num() const {
return m_accept_err_cntr;
}

void ucp_test_base::entity::warn_existing_eps() const {
for (size_t worker_index = 0; worker_index < m_workers.size(); ++worker_index) {
for (size_t ep_index = 0; ep_index < m_workers[worker_index].second.size();
Expand Down
5 changes: 4 additions & 1 deletion test/gtest/ucp/ucp_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ucp_test_base : public ucs::test_base {

bool verify_client_address(struct sockaddr_storage *client_address);

ucp_ep_h accept(ucp_worker_h worker, ucp_conn_request_h conn_request);
void accept(int worker_index, ucp_conn_request_h conn_request);

void* modify_ep(const ucp_ep_params_t& ep_params, int worker_idx = 0,
int ep_idx = 0);
Expand Down Expand Up @@ -141,6 +141,8 @@ class ucp_test_base : public ucs::test_base {

const size_t &get_err_num() const;

const size_t &get_accept_err_num() const;

void warn_existing_eps() const;

double set_ib_ud_timeout(double timeout_sec);
Expand All @@ -161,6 +163,7 @@ class ucp_test_base : public ucs::test_base {
close_ep_reqs_t m_close_ep_reqs;
size_t m_err_cntr;
size_t m_rejected_cntr;
size_t m_accept_err_cntr;
ucs::handle<ucp_ep_params_t*> m_server_ep_params;

private:
Expand Down

0 comments on commit 5f00424

Please sign in to comment.