From 8bad24ac71022f3b053aee70aa86554f50437a06 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 27 Oct 2020 01:08:40 +0200 Subject: [PATCH 1/4] TEST/IODEMO: Simplify connection counting on server side --- test/apps/iodemo/io_demo.cc | 23 +++++------------------ test/apps/iodemo/ucx_wrapper.cc | 24 +++++++++++------------- test/apps/iodemo/ucx_wrapper.h | 9 ++++++--- 3 files changed, 22 insertions(+), 34 deletions(-) diff --git a/test/apps/iodemo/io_demo.cc b/test/apps/iodemo/io_demo.cc index 2af9c50cf21..c89479c95fa 100644 --- a/test/apps/iodemo/io_demo.cc +++ b/test/apps/iodemo/io_demo.cc @@ -671,9 +671,14 @@ class DemoServer : public P2pDemoCommon { recv_data(conn, *iov, msg->tr.sn, w); } + virtual void dispatch_connection_accepted(UcxConnection* conn) { + ++_curr_state.active_conns; + } + virtual void dispatch_connection_error(UcxConnection *conn) { LOG << "deleting connection with status " << ucs_status_string(conn->ucx_status()); + --_curr_state.active_conns; delete conn; } @@ -699,24 +704,6 @@ class DemoServer : public P2pDemoCommon { } private: - virtual bool add_connection(UcxConnection *conn) { - bool added = P2pDemoCommon::add_connection(conn); - if (added) { - ++_curr_state.active_conns; - } - - return added; - } - - virtual bool remove_connection(UcxConnection *conn) { - bool removed = P2pDemoCommon::remove_connection(conn); - if (removed) { - --_curr_state.active_conns; - } - - return removed; - } - void save_prev_state() { _prev_state = _curr_state; } diff --git a/test/apps/iodemo/ucx_wrapper.cc b/test/apps/iodemo/ucx_wrapper.cc index 5500b3d333f..64d4a0022dd 100644 --- a/test/apps/iodemo/ucx_wrapper.cc +++ b/test/apps/iodemo/ucx_wrapper.cc @@ -256,6 +256,7 @@ void UcxContext::progress_conn_requests() UcxConnection *conn = new UcxConnection(*this, get_next_conn_id()); if (conn->accept(_conn_requests.front())) { add_connection(conn); + dispatch_connection_accepted(conn); } else { delete conn; } @@ -342,27 +343,24 @@ void UcxContext::recv_io_message() _iomsg_recv_request = reinterpret_cast(status_ptr); } -bool UcxContext::add_connection(UcxConnection *conn) +void UcxContext::add_connection(UcxConnection *conn) { - if (_conns.find(conn->id()) == _conns.end()) { - _conns[conn->id()] = conn; - return true; - } else { - return false; - } + assert(_conns.find(conn->id()) == _conns.end()); + _conns[conn->id()] = conn; } -bool UcxContext::remove_connection(UcxConnection *conn) +void UcxContext::remove_connection(UcxConnection *conn) { conn_map_t::iterator i = _conns.find(conn->id()); - if (i == _conns.end()) { - return false; - } else { + if (i != _conns.end()) { _conns.erase(i); - return true; } } +void UcxContext::dispatch_connection_accepted(UcxConnection* conn) +{ +} + void UcxContext::handle_connection_error(UcxConnection *conn) { remove_connection(conn); @@ -718,7 +716,7 @@ bool UcxConnection::process_request(const char *what, return true; } else if (UCS_PTR_IS_ERR(ptr_status)) { status = UCS_PTR_STATUS(ptr_status); - UCX_CONN_LOG << what << "failed with status: " + UCX_CONN_LOG << what << " failed with status: " << ucs_status_string(status); (*callback)(status); return false; diff --git a/test/apps/iodemo/ucx_wrapper.h b/test/apps/iodemo/ucx_wrapper.h index ee40360fc5f..943a5bbfbde 100644 --- a/test/apps/iodemo/ucx_wrapper.h +++ b/test/apps/iodemo/ucx_wrapper.h @@ -94,9 +94,8 @@ class UcxContext { // Called when there is a fatal failure on the connection virtual void dispatch_connection_error(UcxConnection* conn) = 0; - virtual bool add_connection(UcxConnection *conn); - - virtual bool remove_connection(UcxConnection *conn); + // Called when new server connection is accepted + virtual void dispatch_connection_accepted(UcxConnection* conn); private: typedef enum { @@ -138,6 +137,10 @@ class UcxContext { void recv_io_message(); + void add_connection(UcxConnection *conn); + + void remove_connection(UcxConnection *conn); + void handle_connection_error(UcxConnection *conn); void destroy_connections(); From 451d7469de102b430712e39754a1c2b379aa27c7 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 27 Oct 2020 01:09:19 +0200 Subject: [PATCH 2/4] UCT/RC: Fix keepalive being disabled with DevX When QP is modified by DevX, the 'state' field may not be updated --- src/uct/ib/rc/accel/rc_mlx5.h | 1 + src/uct/ib/rc/accel/rc_mlx5_ep.c | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/uct/ib/rc/accel/rc_mlx5.h b/src/uct/ib/rc/accel/rc_mlx5.h index c4c963d94e3..388ee2f2148 100644 --- a/src/uct/ib/rc/accel/rc_mlx5.h +++ b/src/uct/ib/rc/accel/rc_mlx5.h @@ -34,6 +34,7 @@ typedef struct uct_rc_mlx5_ep { uct_ib_mlx5_qp_t tm_qp; uct_rc_mlx5_mp_context_t mp; uint16_t atomic_mr_offset; + uint8_t connected; } uct_rc_mlx5_ep_t; typedef struct uct_rc_mlx5_ep_address { diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index 5278df2b3ff..e7b37b7725e 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -579,9 +579,8 @@ ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, * messages are bundled with AM. */ ucs_assert(op == UCT_RC_EP_FC_PURE_GRANT); - if (ucs_unlikely(ep->tx.wq.super.verbs.qp->state != IBV_QPS_RTS)) { - return (ep->tx.wq.super.verbs.qp->state == IBV_QPS_INIT) ? UCS_OK : - UCS_ERR_CONNECTION_RESET; + if (!ep->connected) { + return UCS_OK; } UCT_RC_CHECK_RES(&iface->super, &ep->super); @@ -703,6 +702,7 @@ ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep, } ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id); + ep->connected = 1; return UCS_OK; } @@ -927,6 +927,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, const uct_ep_params_t *params) self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max); self->mp.free = 1; + self->connected = 0; + uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max); return UCS_OK; From c5a0840169fcf0df26957e93d53735289c2dbba0 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 27 Oct 2020 01:11:46 +0200 Subject: [PATCH 3/4] UCT/RC: Fix QP cleanup with tx_moderation enabled Reserve a CQ entry for NOP operation during RC QP flush. Only one is needed since this is a blocking operation. This removes the need to disable TX moderation to avoid deadlock during flush due to lack of CQ resources to post a single NOP (which is needed to gen completions for unsignaled WQEs) --- src/uct/ib/rc/accel/rc_mlx5_common.c | 15 ++++++++++++--- src/uct/ib/rc/base/rc_iface.c | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.c b/src/uct/ib/rc/accel/rc_mlx5_common.c index f1f3c168721..96180054ed6 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.c +++ b/src/uct/ib/rc/accel/rc_mlx5_common.c @@ -1148,6 +1148,8 @@ static int uct_rc_mlx5_common_clean_tx_cq_cb(uct_rc_mlx5_iface_common_t *iface, { uct_rc_mlx5_common_clean_tx_cq_ctx_t *ctx = arg; + ucs_assert(iface->super.tx.cq_available >= -1); + if (cqe != NULL) { uct_rc_mlx5_common_free_tx_res(&iface->super, ctx->txwq, ctx->txqp, htons(cqe->wqe_counter)); @@ -1160,9 +1162,11 @@ static int uct_rc_mlx5_common_clean_tx_cq_cb(uct_rc_mlx5_iface_common_t *iface, /* If not posted NOP already, and have the resources, post it to flush * any unsignaled sends */ - if (ctx->post_nop && (iface->super.tx.cq_available > 0) && - (uct_rc_txqp_available(ctx->txqp) > 0)) - { + if (ctx->post_nop && (uct_rc_txqp_available(ctx->txqp) > 0)) { + /* We reserved one CQ credit for NOP operation, so the lowest value + * cq_available can reach is -1 (after posting the nop) + */ + ucs_assert(iface->super.tx.cq_available >= 0); ucs_trace("qp 0x%x: posted NOP", ctx->txwq->super.qp_num); uct_rc_mlx5_common_post_nop(iface, ctx->txqp, ctx->txwq); ucs_assert(uct_rc_txqp_unsignaled(ctx->txqp) == 0); @@ -1183,6 +1187,11 @@ void uct_rc_mlx5_iface_commom_cq_clean_tx(uct_rc_mlx5_iface_common_t *iface, }; uct_rc_mlx5_iface_commom_cq_clean(iface, UCT_IB_DIR_TX, txwq->super.qp_num, uct_rc_mlx5_common_clean_tx_cq_cb, &ctx); + + /* If NOP was posted and cq_available became -1, it must also be completed + * and restore cq_available to 0 + */ + ucs_assert(iface->super.tx.cq_available >= 0); } void uct_rc_mlx5_iface_print(uct_rc_mlx5_iface_common_t *mlx5_iface, diff --git a/src/uct/ib/rc/base/rc_iface.c b/src/uct/ib/rc/base/rc_iface.c index 19a3a47c100..682b016ba40 100644 --- a/src/uct/ib/rc/base/rc_iface.c +++ b/src/uct/ib/rc/base/rc_iface.c @@ -553,7 +553,7 @@ UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md, UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker, params, &config->super, init_attr); - self->tx.cq_available = init_attr->tx_cq_len - 1; + self->tx.cq_available = init_attr->tx_cq_len - 2; /* reserve for nop */ self->tx.cq_free = 0; self->rx.srq.available = 0; self->rx.srq.quota = 0; From e059cfbaf0b240c64baf18accff794ce4cb64fbf Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 27 Oct 2020 01:58:36 +0200 Subject: [PATCH 4/4] TEST/IODEMO: Fix increased memory usage due to invalid outstanding count The client counts incorrectly the number of outstanding ops: when a connection is closed, _num_sent is decreased, but it should not be, since read requests also complete with error status and _num_completions is increased. As a result, the client sends more and more outstanding operations after every terminated connection, which causes increase in memory usage. The fix is to first close the connection, which will restore completions for read requests, and then restore the remaining credits for write requests - which will never complete on their own because they are waiting for write completion io-message. --- test/apps/iodemo/io_demo.cc | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/test/apps/iodemo/io_demo.cc b/test/apps/iodemo/io_demo.cc index c89479c95fa..e2d2c9fc62f 100644 --- a/test/apps/iodemo/io_demo.cc +++ b/test/apps/iodemo/io_demo.cc @@ -74,8 +74,8 @@ typedef struct { template class MemoryPool { public: - MemoryPool(size_t buffer_size, size_t offcache = 0) : - _num_allocated(0), _buffer_size(buffer_size) { + MemoryPool(size_t buffer_size, const std::string& name, size_t offcache = 0) : + _num_allocated(0), _buffer_size(buffer_size), _name(name) { for (size_t i = 0; i < offcache; ++i) { _offcache_queue.push(get_free()); @@ -89,8 +89,8 @@ class MemoryPool { } if (_num_allocated != _free_stack.size()) { - LOG << "Some items were not freed. Total:" << _num_allocated - << ", current:" << _free_stack.size() << "."; + LOG << (_num_allocated - _free_stack.size()) + << " buffers were not released from " << _name; } for (size_t i = 0; i < _free_stack.size(); i++) { @@ -137,6 +137,7 @@ class MemoryPool { std::queue _offcache_queue; uint32_t _num_allocated; size_t _buffer_size; + std::string _name; }; /** @@ -438,11 +439,12 @@ class P2pDemoCommon : public UcxContext { P2pDemoCommon(const options_t& test_opts) : UcxContext(test_opts.iomsg_size), _test_opts(test_opts), - _io_msg_pool(test_opts.iomsg_size), - _send_callback_pool(0), + _io_msg_pool(test_opts.iomsg_size, "io messages"), + _send_callback_pool(0, "send callbacks"), _data_buffers_pool(get_chunk_cnt(test_opts.max_data_size, - test_opts.chunk_size)), - _data_chunks_pool(test_opts.chunk_size, test_opts.num_offcache_buffers) { + test_opts.chunk_size), "data iovs"), + _data_chunks_pool(test_opts.chunk_size, "data chunks", + test_opts.num_offcache_buffers) { } const options_t& opts() const { @@ -604,7 +606,7 @@ class DemoServer : public P2pDemoCommon { } state_t; DemoServer(const options_t& test_opts) : - P2pDemoCommon(test_opts), _callback_pool(0) { + P2pDemoCommon(test_opts), _callback_pool(0, "callbacks") { _curr_state.read_count = 0; _curr_state.write_count = 0; _curr_state.active_conns = 0; @@ -799,7 +801,7 @@ class DemoClient : public P2pDemoCommon { P2pDemoCommon(test_opts), _prev_connect_time(0), _num_sent(0), _num_completed(0), _status(OK), _start_time(get_time()), - _retry(0), _read_callback_pool(opts().iomsg_size) { + _retry(0), _read_callback_pool(opts().iomsg_size, "read callbacks") { } typedef enum { @@ -914,13 +916,15 @@ class DemoClient : public P2pDemoCommon { size_t server_index = get_server_index(conn); server_info_t& server_info = _server_info[server_index]; - // Don't wait from completions on this connection - _num_sent -= get_num_uncompleted(server_index); - // Remove connection pointer _server_index_lookup.erase(conn); + + // Destroying the connection will complete its outstanding operations delete conn; + // Don't wait for any more completions on this connection + _num_sent -= get_num_uncompleted(server_index); + // Replace in _active_servers by the last element in the vector size_t active_index = server_info.active_index; std::swap(_active_servers[active_index], _active_servers.back());