Skip to content

Commit

Permalink
Merge pull request openucx#5957 from evgeny-leksikov/iodemo_listen_retry
Browse files Browse the repository at this point in the history
IODEMO: add listen retry for server
  • Loading branch information
yosefe authored Dec 1, 2020
2 parents 86755f6 + 2c8c157 commit c197746
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
61 changes: 44 additions & 17 deletions test/apps/iodemo/io_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ typedef struct {
int port_num;
double connect_timeout;
double client_timeout;
long client_retries;
double client_retry_interval;
long retries;
double retry_interval;
double client_runtime_limit;
double print_interval;
size_t iomsg_size;
Expand Down Expand Up @@ -645,7 +645,33 @@ class DemoServer : public P2pDemoCommon {
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(opts().port_num);

listen((const struct sockaddr*)&listen_addr, sizeof(listen_addr));
for (long retry = 1;; ++retry) {
if (listen((const struct sockaddr*)&listen_addr,
sizeof(listen_addr))) {
break;
}

if (retry > opts().retries) {
return;
}

{
UcxLog log(LOG_PREFIX);
log << "restarting listener on "
<< UcxContext::sockaddr_str((struct sockaddr*)&listen_addr,
sizeof(listen_addr))
<< " in " << opts().retry_interval << " seconds (retry "
<< retry;

if (opts().retries < std::numeric_limits<long>::max()) {
log << "/" << opts().retries;
}

log << ")";
}

sleep(opts().retry_interval);
}

for (double prev_time = 0.0; ;) {
try {
Expand Down Expand Up @@ -1213,12 +1239,12 @@ class DemoClient : public P2pDemoCommon {
UcxLog log(LOG_PREFIX);
log << "Connect to " << server_name(server_index) << " failed"
<< " (retry " << server_info.retry_count;
if (opts().client_retries < std::numeric_limits<long>::max()) {
log << "/" << opts().client_retries;
if (opts().retries < std::numeric_limits<long>::max()) {
log << "/" << opts().retries;
}
log << ")";

if (server_info.retry_count >= opts().client_retries) {
if (server_info.retry_count >= opts().retries) {
/* If at least one server exceeded its retries, bail */
_status = CONN_RETRIES_EXCEEDED;
}
Expand All @@ -1238,7 +1264,7 @@ class DemoClient : public P2pDemoCommon {
}

double curr_time = get_time();
if (curr_time < (_prev_connect_time + opts().client_retry_interval)) {
if (curr_time < (_prev_connect_time + opts().retry_interval)) {
// Not enough time elapsed since previous connection attempt
return;
}
Expand All @@ -1254,7 +1280,7 @@ class DemoClient : public P2pDemoCommon {
// If retry count exceeded for at least one server, we should have
// exited already
assert(_status == OK);
assert(server_info.retry_count < opts().client_retries);
assert(server_info.retry_count < opts().retries);

server_info.conn = connect(opts().servers[server_index]);
if (server_info.conn == NULL) {
Expand Down Expand Up @@ -1313,8 +1339,8 @@ class DemoClient : public P2pDemoCommon {

if (_active_servers.empty()) {
LOG << "All remote servers are down, reconnecting in "
<< opts().client_retry_interval << " seconds";
sleep(opts().client_retry_interval);
<< opts().retry_interval << " seconds";
sleep(opts().retry_interval);
check_time_limit(get_time());
continue;
}
Expand Down Expand Up @@ -1602,8 +1628,8 @@ static int parse_args(int argc, char **argv, options_t *test_opts)
test_opts->port_num = 1337;
test_opts->connect_timeout = 20.0;
test_opts->client_timeout = 50.0;
test_opts->client_retries = std::numeric_limits<long>::max();
test_opts->client_retry_interval = 5.0;
test_opts->retries = std::numeric_limits<long>::max();
test_opts->retry_interval = 5.0;
test_opts->client_runtime_limit = std::numeric_limits<double>::max();
test_opts->print_interval = 1.0;
test_opts->min_data_size = 4096;
Expand All @@ -1625,13 +1651,13 @@ static int parse_args(int argc, char **argv, options_t *test_opts)
break;
case 'c':
if (strcmp(optarg, "inf")) {
test_opts->client_retries = strtol(optarg, NULL, 0);
test_opts->retries = strtol(optarg, NULL, 0);
}
break;
case 'y':
if (set_time(optarg, &test_opts->client_retry_interval) != 0) {
if (set_time(optarg, &test_opts->retry_interval) != 0) {
std::cout << "invalid '" << optarg
<< "' value for client retry interval" << std::endl;
<< "' value for retry interval" << std::endl;
return -1;
}
break;
Expand Down Expand Up @@ -1749,9 +1775,10 @@ static int parse_args(int argc, char **argv, options_t *test_opts)
std::cout << " -k <chunk-size> Split the data transfer to chunks of this size" << std::endl;
std::cout << " -r <io-request-size> Size of IO request packet" << std::endl;
std::cout << " -t <client timeout> Client timeout (or \"inf\")" << std::endl;
std::cout << " -c <client retries> Number of connection retries on client" << std::endl;
std::cout << " -c <retries> Number of connection retries on client or " << std::endl;
std::cout << " listen retries on server" << std::endl;
std::cout << " (or \"inf\") for failure" << std::endl;
std::cout << " -y <client retry interval> Client retry interval" << std::endl;
std::cout << " -y <retry interval> Retry interval" << std::endl;
std::cout << " -l <client run-time limit> Time limit to run the IO client (or \"inf\")" << std::endl;
std::cout << " Examples: -l 17.5s; -l 10m; 15.5h" << std::endl;
std::cout << " -s <random seed> Random seed to use for randomizing" << std::endl;
Expand Down
7 changes: 3 additions & 4 deletions test/apps/iodemo/ucx_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class UcxContext {

void progress();

static const std::string sockaddr_str(const struct sockaddr* saddr,
size_t addrlen);

protected:

// Called when new IO message is received
Expand Down Expand Up @@ -138,10 +141,6 @@ class UcxContext {
void *data, size_t length,
const ucp_am_recv_param_t *param);


static const std::string sockaddr_str(const struct sockaddr* saddr,
size_t addrlen);

ucp_worker_h worker() const;

double connect_timeout() const;
Expand Down

0 comments on commit c197746

Please sign in to comment.