Skip to content

Commit

Permalink
Merge pull request #71 from yosefe/topic/rc-and-iodemo-fixes
Browse files Browse the repository at this point in the history
RC and iodemo fixes
  • Loading branch information
yosefe authored Oct 27, 2020
2 parents b02dae4 + e059cfb commit aa41625
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 54 deletions.
1 change: 1 addition & 0 deletions src/uct/ib/rc/accel/rc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef struct uct_rc_mlx5_ep {
uct_ib_mlx5_qp_t tm_qp;
uct_rc_mlx5_mp_context_t mp;
uint16_t atomic_mr_offset;
uint8_t connected;
} uct_rc_mlx5_ep_t;

typedef struct uct_rc_mlx5_ep_address {
Expand Down
15 changes: 12 additions & 3 deletions src/uct/ib/rc/accel/rc_mlx5_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,8 @@ static int uct_rc_mlx5_common_clean_tx_cq_cb(uct_rc_mlx5_iface_common_t *iface,
{
uct_rc_mlx5_common_clean_tx_cq_ctx_t *ctx = arg;

ucs_assert(iface->super.tx.cq_available >= -1);

if (cqe != NULL) {
uct_rc_mlx5_common_free_tx_res(&iface->super, ctx->txwq, ctx->txqp,
htons(cqe->wqe_counter));
Expand All @@ -1160,9 +1162,11 @@ static int uct_rc_mlx5_common_clean_tx_cq_cb(uct_rc_mlx5_iface_common_t *iface,
/* If not posted NOP already, and have the resources, post it to flush
* any unsignaled sends
*/
if (ctx->post_nop && (iface->super.tx.cq_available > 0) &&
(uct_rc_txqp_available(ctx->txqp) > 0))
{
if (ctx->post_nop && (uct_rc_txqp_available(ctx->txqp) > 0)) {
/* We reserved one CQ credit for NOP operation, so the lowest value
* cq_available can reach is -1 (after posting the nop)
*/
ucs_assert(iface->super.tx.cq_available >= 0);
ucs_trace("qp 0x%x: posted NOP", ctx->txwq->super.qp_num);
uct_rc_mlx5_common_post_nop(iface, ctx->txqp, ctx->txwq);
ucs_assert(uct_rc_txqp_unsignaled(ctx->txqp) == 0);
Expand All @@ -1183,6 +1187,11 @@ void uct_rc_mlx5_iface_commom_cq_clean_tx(uct_rc_mlx5_iface_common_t *iface,
};
uct_rc_mlx5_iface_commom_cq_clean(iface, UCT_IB_DIR_TX, txwq->super.qp_num,
uct_rc_mlx5_common_clean_tx_cq_cb, &ctx);

/* If NOP was posted and cq_available became -1, it must also be completed
* and restore cq_available to 0
*/
ucs_assert(iface->super.tx.cq_available >= 0);
}

void uct_rc_mlx5_iface_print(uct_rc_mlx5_iface_common_t *mlx5_iface,
Expand Down
8 changes: 5 additions & 3 deletions src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,8 @@ ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
* messages are bundled with AM. */
ucs_assert(op == UCT_RC_EP_FC_PURE_GRANT);

if (ucs_unlikely(ep->tx.wq.super.verbs.qp->state != IBV_QPS_RTS)) {
return (ep->tx.wq.super.verbs.qp->state == IBV_QPS_INIT) ? UCS_OK :
UCS_ERR_CONNECTION_RESET;
if (!ep->connected) {
return UCS_OK;
}

UCT_RC_CHECK_RES(&iface->super, &ep->super);
Expand Down Expand Up @@ -703,6 +702,7 @@ ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep,
}

ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id);
ep->connected = 1;

return UCS_OK;
}
Expand Down Expand Up @@ -927,6 +927,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, const uct_ep_params_t *params)

self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max);
self->mp.free = 1;
self->connected = 0;

uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max);
return UCS_OK;

Expand Down
2 changes: 1 addition & 1 deletion src/uct/ib/rc/base/rc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md,
UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker, params,
&config->super, init_attr);

self->tx.cq_available = init_attr->tx_cq_len - 1;
self->tx.cq_available = init_attr->tx_cq_len - 2; /* reserve for nop */
self->tx.cq_free = 0;
self->rx.srq.available = 0;
self->rx.srq.quota = 0;
Expand Down
53 changes: 22 additions & 31 deletions test/apps/iodemo/io_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ typedef struct {
template<class T, bool use_offcache = false>
class MemoryPool {
public:
MemoryPool(size_t buffer_size, size_t offcache = 0) :
_num_allocated(0), _buffer_size(buffer_size) {
MemoryPool(size_t buffer_size, const std::string& name, size_t offcache = 0) :
_num_allocated(0), _buffer_size(buffer_size), _name(name) {

for (size_t i = 0; i < offcache; ++i) {
_offcache_queue.push(get_free());
Expand All @@ -89,8 +89,8 @@ class MemoryPool {
}

if (_num_allocated != _free_stack.size()) {
LOG << "Some items were not freed. Total:" << _num_allocated
<< ", current:" << _free_stack.size() << ".";
LOG << (_num_allocated - _free_stack.size())
<< " buffers were not released from " << _name;
}

for (size_t i = 0; i < _free_stack.size(); i++) {
Expand Down Expand Up @@ -137,6 +137,7 @@ class MemoryPool {
std::queue<T*> _offcache_queue;
uint32_t _num_allocated;
size_t _buffer_size;
std::string _name;
};

/**
Expand Down Expand Up @@ -438,11 +439,12 @@ class P2pDemoCommon : public UcxContext {
P2pDemoCommon(const options_t& test_opts) :
UcxContext(test_opts.iomsg_size),
_test_opts(test_opts),
_io_msg_pool(test_opts.iomsg_size),
_send_callback_pool(0),
_io_msg_pool(test_opts.iomsg_size, "io messages"),
_send_callback_pool(0, "send callbacks"),
_data_buffers_pool(get_chunk_cnt(test_opts.max_data_size,
test_opts.chunk_size)),
_data_chunks_pool(test_opts.chunk_size, test_opts.num_offcache_buffers) {
test_opts.chunk_size), "data iovs"),
_data_chunks_pool(test_opts.chunk_size, "data chunks",
test_opts.num_offcache_buffers) {
}

const options_t& opts() const {
Expand Down Expand Up @@ -604,7 +606,7 @@ class DemoServer : public P2pDemoCommon {
} state_t;

DemoServer(const options_t& test_opts) :
P2pDemoCommon(test_opts), _callback_pool(0) {
P2pDemoCommon(test_opts), _callback_pool(0, "callbacks") {
_curr_state.read_count = 0;
_curr_state.write_count = 0;
_curr_state.active_conns = 0;
Expand Down Expand Up @@ -671,9 +673,14 @@ class DemoServer : public P2pDemoCommon {
recv_data(conn, *iov, msg->tr.sn, w);
}

virtual void dispatch_connection_accepted(UcxConnection* conn) {
++_curr_state.active_conns;
}

virtual void dispatch_connection_error(UcxConnection *conn) {
LOG << "deleting connection with status "
<< ucs_status_string(conn->ucx_status());
--_curr_state.active_conns;
delete conn;
}

Expand All @@ -699,24 +706,6 @@ class DemoServer : public P2pDemoCommon {
}

private:
virtual bool add_connection(UcxConnection *conn) {
bool added = P2pDemoCommon::add_connection(conn);
if (added) {
++_curr_state.active_conns;
}

return added;
}

virtual bool remove_connection(UcxConnection *conn) {
bool removed = P2pDemoCommon::remove_connection(conn);
if (removed) {
--_curr_state.active_conns;
}

return removed;
}

void save_prev_state() {
_prev_state = _curr_state;
}
Expand Down Expand Up @@ -812,7 +801,7 @@ class DemoClient : public P2pDemoCommon {
P2pDemoCommon(test_opts), _prev_connect_time(0),
_num_sent(0), _num_completed(0),
_status(OK), _start_time(get_time()),
_retry(0), _read_callback_pool(opts().iomsg_size) {
_retry(0), _read_callback_pool(opts().iomsg_size, "read callbacks") {
}

typedef enum {
Expand Down Expand Up @@ -927,13 +916,15 @@ class DemoClient : public P2pDemoCommon {
size_t server_index = get_server_index(conn);
server_info_t& server_info = _server_info[server_index];

// Don't wait from completions on this connection
_num_sent -= get_num_uncompleted(server_index);

// Remove connection pointer
_server_index_lookup.erase(conn);

// Destroying the connection will complete its outstanding operations
delete conn;

// Don't wait for any more completions on this connection
_num_sent -= get_num_uncompleted(server_index);

// Replace in _active_servers by the last element in the vector
size_t active_index = server_info.active_index;
std::swap(_active_servers[active_index], _active_servers.back());
Expand Down
24 changes: 11 additions & 13 deletions test/apps/iodemo/ucx_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ void UcxContext::progress_conn_requests()
UcxConnection *conn = new UcxConnection(*this, get_next_conn_id());
if (conn->accept(_conn_requests.front())) {
add_connection(conn);
dispatch_connection_accepted(conn);
} else {
delete conn;
}
Expand Down Expand Up @@ -342,27 +343,24 @@ void UcxContext::recv_io_message()
_iomsg_recv_request = reinterpret_cast<ucx_request*>(status_ptr);
}

bool UcxContext::add_connection(UcxConnection *conn)
void UcxContext::add_connection(UcxConnection *conn)
{
if (_conns.find(conn->id()) == _conns.end()) {
_conns[conn->id()] = conn;
return true;
} else {
return false;
}
assert(_conns.find(conn->id()) == _conns.end());
_conns[conn->id()] = conn;
}

bool UcxContext::remove_connection(UcxConnection *conn)
void UcxContext::remove_connection(UcxConnection *conn)
{
conn_map_t::iterator i = _conns.find(conn->id());
if (i == _conns.end()) {
return false;
} else {
if (i != _conns.end()) {
_conns.erase(i);
return true;
}
}

void UcxContext::dispatch_connection_accepted(UcxConnection* conn)
{
}

void UcxContext::handle_connection_error(UcxConnection *conn)
{
remove_connection(conn);
Expand Down Expand Up @@ -718,7 +716,7 @@ bool UcxConnection::process_request(const char *what,
return true;
} else if (UCS_PTR_IS_ERR(ptr_status)) {
status = UCS_PTR_STATUS(ptr_status);
UCX_CONN_LOG << what << "failed with status: "
UCX_CONN_LOG << what << " failed with status: "
<< ucs_status_string(status);
(*callback)(status);
return false;
Expand Down
9 changes: 6 additions & 3 deletions test/apps/iodemo/ucx_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ class UcxContext {
// Called when there is a fatal failure on the connection
virtual void dispatch_connection_error(UcxConnection* conn) = 0;

virtual bool add_connection(UcxConnection *conn);

virtual bool remove_connection(UcxConnection *conn);
// Called when new server connection is accepted
virtual void dispatch_connection_accepted(UcxConnection* conn);

private:
typedef enum {
Expand Down Expand Up @@ -138,6 +137,10 @@ class UcxContext {

void recv_io_message();

void add_connection(UcxConnection *conn);

void remove_connection(UcxConnection *conn);

void handle_connection_error(UcxConnection *conn);

void destroy_connections();
Expand Down

0 comments on commit aa41625

Please sign in to comment.