Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IODEMO: non blocking connection establishment #6338

Merged
merged 1 commit into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 80 additions & 47 deletions test/apps/iodemo/io_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <limits>
gleon99 marked this conversation as resolved.
Show resolved Hide resolved
#include <malloc.h>
#include <dlfcn.h>
#include <set>

#ifdef HAVE_CUDA
#include <cuda.h>
Expand Down Expand Up @@ -734,12 +735,6 @@ class P2pDemoCommon : public UcxContext {
validate(msg, iomsg_size);
}

static double get_time() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + (tv.tv_usec * 1e-6);
}

private:
bool send_io_message(UcxConnection *conn, IoMessage *msg) {
VERBOSE_LOG << "sending IO " << io_op_names[msg->msg()->op] << ", sn "
Expand Down Expand Up @@ -835,10 +830,6 @@ class DemoServer : public P2pDemoCommon {
save_prev_state();
}

virtual ~DemoServer() {
destroy_connections();
}

void run() {
struct sockaddr_in listen_addr;
memset(&listen_addr, 0, sizeof(listen_addr));
Expand Down Expand Up @@ -966,6 +957,7 @@ class DemoServer : public P2pDemoCommon {
virtual void dispatch_connection_error(UcxConnection *conn) {
LOG << "deleting connection with status "
<< ucs_status_string(conn->ucx_status());
assert(conn->is_established());
--_curr_state.active_conns;
delete conn;
}
Expand Down Expand Up @@ -1042,12 +1034,37 @@ class DemoClient : public P2pDemoCommon {
typedef struct {
UcxConnection* conn;
long retry_count; /* Connect retry counter */
double prev_connect_time; /* timestamp of last connect attempt */
long num_sent; /* Total number of sent operations */
size_t active_index; /* Index in active vector */
long num_completed[IO_OP_MAX]; /* Number of completed operations */
long prev_completed[IO_OP_MAX]; /* Completed in last report */
} server_info_t;

class ConnectCallback : public UcxCallback {
public:
ConnectCallback(DemoClient &client, size_t server_idx) :
_client(client), _server_idx(server_idx)
{
}

virtual void operator()(ucs_status_t status)
{
if (status == UCS_OK) {
_client.connect_succeed(_server_idx);
yosefe marked this conversation as resolved.
Show resolved Hide resolved
} else {
_client.connect_failed(_server_idx);
}

_client._connecting_servers.erase(_server_idx);
yosefe marked this conversation as resolved.
Show resolved Hide resolved
delete this;
}

private:
DemoClient &_client;
const size_t _server_idx;
};

class IoReadResponseCallback : public UcxCallback {
public:
IoReadResponseCallback(size_t buffer_size,
Expand Down Expand Up @@ -1125,20 +1142,14 @@ class DemoClient : public P2pDemoCommon {
DemoClient(const options_t &test_opts) :
P2pDemoCommon(test_opts),
_num_active_servers_to_use(0),
_prev_connect_time(0),
_num_sent(0),
_num_completed(0),
_status(OK),
_start_time(get_time()),
_read_callback_pool(opts().iomsg_size, "read callbacks",
UCS_MEMORY_TYPE_HOST)
_read_callback_pool(opts().iomsg_size, "read callbacks")
{
}

virtual ~DemoClient() {
destroy_connections();
}

typedef enum {
OK,
CONN_RETRIES_EXCEEDED,
Expand Down Expand Up @@ -1167,6 +1178,7 @@ class DemoClient : public P2pDemoCommon {
}

void handle_operation_completion(size_t server_index, io_op_t op) {
assert(server_index < _server_info.size());
server_info_t& server_info = _server_info[server_index];

assert(get_num_uncompleted(server_info) <= opts().conn_window_size);
Expand Down Expand Up @@ -1346,7 +1358,7 @@ class DemoClient : public P2pDemoCommon {
}
}

long get_num_uncompleted(const server_info_t& server_info) const {
static long get_num_uncompleted(const server_info_t& server_info) {
long num_uncompleted;

num_uncompleted = server_info.num_sent -
Expand Down Expand Up @@ -1434,7 +1446,9 @@ class DemoClient : public P2pDemoCommon {
}
}

UcxConnection* connect(const char* server) {
void connect(size_t server_index)
{
const char *server = opts().servers[server_index];
struct sockaddr_in connect_addr;
std::string server_addr;
int port_num;
Expand All @@ -1458,11 +1472,19 @@ class DemoClient : public P2pDemoCommon {
int ret = inet_pton(AF_INET, server_addr.c_str(), &connect_addr.sin_addr);
if (ret != 1) {
LOG << "invalid address " << server_addr;
return NULL;
abort();
}

if (!_connecting_servers.insert(server_index).second) {
yosefe marked this conversation as resolved.
Show resolved Hide resolved
LOG << server_name(server_index) << " is already connecting";
abort();
}

return UcxContext::connect((const struct sockaddr*)&connect_addr,
sizeof(connect_addr));
UcxConnection *conn = new UcxConnection(*this, opts().use_am);
_server_info[server_index].conn = conn;
conn->connect((const struct sockaddr*)&connect_addr,
sizeof(connect_addr),
new ConnectCallback(*this, server_index));
}

const std::string server_name(size_t server_index) {
Expand All @@ -1471,9 +1493,23 @@ class DemoClient : public P2pDemoCommon {
return ss.str();
}

void connect_succeed(size_t server_index)
{
server_info_t &server_info = _server_info[server_index];

server_info.retry_count = 0;
server_info.prev_connect_time = 0.;
_server_index_lookup[server_info.conn] = server_index;
active_servers_add(server_index);
LOG << "Connected to " << server_name(server_index);
}

void connect_failed(size_t server_index) {
server_info_t& server_info = _server_info[server_index];

// The connection should close itself calling error handler
server_info.conn = NULL;

++server_info.retry_count;

UcxLog log(LOG_PREFIX);
Expand Down Expand Up @@ -1504,16 +1540,11 @@ class DemoClient : public P2pDemoCommon {
}

double curr_time = get_time();
if (curr_time < (_prev_connect_time + opts().retry_interval)) {
// Not enough time elapsed since previous connection attempt
return;
}

for (size_t server_index = 0; server_index < _server_info.size();
++server_index) {
server_info_t& server_info = _server_info[server_index];
if (server_info.conn != NULL) {
// Server is already connected
// Already connecting to this server
continue;
}

Expand All @@ -1522,23 +1553,17 @@ class DemoClient : public P2pDemoCommon {
assert(_status == OK);
assert(server_info.retry_count < opts().retries);

server_info.conn = connect(opts().servers[server_index]);
if (server_info.conn == NULL) {
connect_failed(server_index);
if (_status != OK) {
break;
}
if (curr_time < (server_info.prev_connect_time +
opts().retry_interval)) {
// Not enough time elapsed since previous connection attempt
continue;
}

server_info.retry_count = 0;
_server_index_lookup[server_info.conn] = server_index;
active_servers_add(server_index);

LOG << "Connected to " << server_name(server_index);
connect(server_index);
server_info.prev_connect_time = curr_time;
assert(server_info.conn != NULL);
assert(_status == OK);
Copy link
Contributor

@gleon99 gleon99 Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance _status can not be modified by a call to connect(), thus there's no point in checking it (after line 1359 check)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO there is a point until Connection implementation can affect UcxContext state
I replaced if condition with assert because connect is changed this way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see where connect() can modify _status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

via callback

}

_prev_connect_time = curr_time;
}

size_t pick_server_index() const {
Expand Down Expand Up @@ -1583,10 +1608,14 @@ class DemoClient : public P2pDemoCommon {
}

if (_active_servers.empty()) {
LOG << "All remote servers are down, reconnecting in "
<< opts().retry_interval << " seconds";
sleep(opts().retry_interval);
check_time_limit(get_time());
if (_connecting_servers.empty()) {
LOG << "All remote servers are down, reconnecting in "
<< opts().retry_interval << " seconds";
sleep(opts().retry_interval);
check_time_limit(get_time());
} else {
progress();
}
continue;
}

Expand Down Expand Up @@ -1664,7 +1693,7 @@ class DemoClient : public P2pDemoCommon {

for (size_t server_index = 0; server_index < _server_info.size();
++server_index) {
LOG << "Disconnecting from server " << server_name(server_index);
LOG << "Disconnecting from " << server_name(server_index);
delete _server_info[server_index].conn;
_server_info[server_index].conn = NULL;
}
Expand Down Expand Up @@ -1822,10 +1851,14 @@ class DemoClient : public P2pDemoCommon {

private:
std::vector<server_info_t> _server_info;
// Connection establishment is in progress
std::set<size_t> _connecting_servers;
// Active servers is the list of communicating servers
std::vector<size_t> _active_servers;
// Num active servers to use handles window size, server becomes "unused" if
// its window is full
size_t _num_active_servers_to_use;
std::map<const UcxConnection*, size_t> _server_index_lookup;
double _prev_connect_time;
long _num_sent;
long _num_completed;
status_t _status;
Expand Down
Loading