Skip to content

Commit

Permalink
Merge pull request #141 from dmitrygx/topic/iodemo/fix
Browse files Browse the repository at this point in the history
APPS/IODEMO: Fixes and improvements (backported)
  • Loading branch information
yosefe authored May 18, 2021
2 parents 74cbf08 + 780254c commit 742e547
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
31 changes: 22 additions & 9 deletions test/apps/iodemo/io_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ class DemoClient : public P2pDemoCommon {
UcxConnection* conn;
long retry_count; /* Connect retry counter */
double prev_connect_time; /* timestamp of last connect attempt */
long num_sent; /* Total number of sent operations */
size_t active_index; /* Index in active vector */
long num_sent[IO_OP_MAX]; /* Number of sent operations */
long num_completed[IO_OP_MAX]; /* Number of completed operations */
long prev_completed[IO_OP_MAX]; /* Completed in last report */
} server_info_t;
Expand Down Expand Up @@ -1006,12 +1006,12 @@ class DemoClient : public P2pDemoCommon {
return _server_info[server_index];
}

void commit_operation(size_t server_index) {
void commit_operation(size_t server_index, io_op_t op) {
server_info_t& server_info = _server_info[server_index];

assert(get_num_uncompleted(server_info) < opts().conn_window_size);

++server_info.num_sent;
++server_info.num_sent[op];
++_num_sent;
if (get_num_uncompleted(server_info) == opts().conn_window_size) {
active_servers_make_unused(server_info.active_index);
Expand Down Expand Up @@ -1045,7 +1045,7 @@ class DemoClient : public P2pDemoCommon {
BufferIov *iov = _data_buffers_pool.get();
IoReadResponseCallback *r = _read_callback_pool.get();

commit_operation(server_index);
commit_operation(server_index, IO_READ);

iov->init(data_size, _data_chunks_pool, sn, validate);
r->init(this, server_index, sn, validate, iov);
Expand All @@ -1069,7 +1069,7 @@ class DemoClient : public P2pDemoCommon {
BufferIov *iov = _data_buffers_pool.get();
SendCompleteCallback *cb = _send_callback_pool.get();

commit_operation(server_index);
commit_operation(server_index, IO_WRITE);

iov->init(data_size, _data_chunks_pool, sn, validate);
cb->init(iov, NULL);
Expand Down Expand Up @@ -1120,7 +1120,8 @@ class DemoClient : public P2pDemoCommon {
}

static long get_num_uncompleted(const server_info_t& server_info) {
return server_info.num_sent -
return (server_info.num_sent[IO_READ] +
server_info.num_sent[IO_WRITE]) -
(server_info.num_completed[IO_READ] +
server_info.num_completed[IO_WRITE]);
}
Expand All @@ -1132,8 +1133,8 @@ class DemoClient : public P2pDemoCommon {

static void reset_server_info(server_info_t& server_info) {
server_info.conn = NULL;
server_info.num_sent = 0;
for (int op = 0; op < IO_OP_MAX; ++op) {
server_info.num_sent[op] = 0;
server_info.num_completed[op] = 0;
server_info.prev_completed[op] = 0;
}
Expand All @@ -1151,11 +1152,23 @@ class DemoClient : public P2pDemoCommon {
server_info_t& server_info = _server_info[server_index];

if (server_info.conn->is_disconnecting()) {
LOG << "not disconnecting " << server_info.conn << " with "
<< get_num_uncompleted(server_info) << " uncompleted operations"
" (read: " << server_info.num_completed[IO_READ] << "/"
<< server_info.num_sent[IO_READ] << "; write: "
<< server_info.num_completed[IO_WRITE] << "/"
<< server_info.num_sent[IO_WRITE] << ") due to \"" << reason
<< "\" because disconnection is already in progress";
return;
}

LOG << "disconnecting connection " << server_info.conn << " due to "
<< reason;
LOG << "disconnecting connection " << server_info.conn << " with "
<< get_num_uncompleted(server_info) << " uncompleted operations"
" (read: " << server_info.num_completed[IO_READ] << "/"
<< server_info.num_sent[IO_READ] << "; write: "
<< server_info.num_completed[IO_WRITE] << "/"
<< server_info.num_sent[IO_WRITE] << ") due to \"" << reason
<< "\"";

// Destroying the connection will complete its outstanding operations
server_info.conn->disconnect(new DisconnectCallback(*this,
Expand Down
24 changes: 18 additions & 6 deletions test/apps/iodemo/ucx_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ void UcxContext::request_reset(ucx_request *r)
r->completed = false;
r->callback = NULL;
r->conn = NULL;
r->status = UCS_OK;
r->recv_length = 0;
r->pos.next = NULL;
r->pos.prev = NULL;
Expand Down Expand Up @@ -453,13 +454,16 @@ void UcxContext::add_connection(UcxConnection *conn)
{
assert(_conns.find(conn->id()) == _conns.end());
_conns[conn->id()] = conn;
UCX_LOG << "added " << conn->get_log_prefix() << " to connection map";
}

void UcxContext::remove_connection(UcxConnection *conn)
{
conn_map_t::iterator i = _conns.find(conn->id());
if (i != _conns.end()) {
_conns.erase(i);
UCX_LOG << "removed " << conn->get_log_prefix()
<< " from connection map";
}
}

Expand Down Expand Up @@ -712,11 +716,10 @@ void UcxConnection::cancel_all()
ucx_request *request, *tmp;
unsigned count = 0;
ucs_list_for_each_safe(request, tmp, &_all_requests, pos) {
ucp_request_cancel(_context.worker(), request);
++count;
UCX_CONN_LOG << "canceling " << request << " request #" << count;
ucp_request_cancel(_context.worker(), request);
}

UCX_CONN_LOG << "canceling " << count << " requests ";
}

ucp_tag_t UcxConnection::make_data_tag(uint32_t conn_id, uint32_t sn)
Expand Down Expand Up @@ -755,6 +758,8 @@ void UcxConnection::common_request_callback(void *request, ucs_status_t status)
ucx_request *r = reinterpret_cast<ucx_request*>(request);

assert(!r->completed);
r->status = status;

if (r->callback) {
// already processed by send/recv function
(*r->callback)(status);
Expand All @@ -763,7 +768,6 @@ void UcxConnection::common_request_callback(void *request, ucs_status_t status)
} else {
// not yet processed by "process_request"
r->completed = true;
r->status = status;
}
}

Expand Down Expand Up @@ -872,6 +876,7 @@ void UcxConnection::connect_common(ucp_ep_params_t &ep_params,
<< _conn_id;

connect_tag(callback);
_context.add_connection(this);
}

void UcxConnection::established(ucs_status_t status)
Expand Down Expand Up @@ -913,8 +918,15 @@ void UcxConnection::request_completed(ucx_request *r)
{
assert(r->conn == this);
ucs_list_del(&r->pos);
if (ucs_list_is_empty(&_all_requests) && (_disconnect_cb != NULL)) {
_context.move_connection_to_disconnecting(this);

if (_disconnect_cb != NULL) {
UCX_CONN_LOG << "completing request " << r << " with status \""
<< ucs_status_string(r->status) << "\" (" << r->status
<< ")" << " during disconnect";

if (ucs_list_is_empty(&_all_requests)) {
_context.move_connection_to_disconnecting(this);
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions test/apps/iodemo/ucx_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ class UcxConnection {
return _ucx_status;
}

const char* get_log_prefix() const {
return _log_prefix;
}

bool is_established() const {
return _establish_cb == NULL;
}
Expand Down

0 comments on commit 742e547

Please sign in to comment.