diff --git a/contrib/valgrind.supp b/contrib/valgrind.supp index 69a7789a4ebf..98136d33d459 100644 --- a/contrib/valgrind.supp +++ b/contrib/valgrind.supp @@ -80,6 +80,20 @@ ... fun:gdr_copy_to_bar } +{ + ibv_exp_reg_mr + Memcheck:Param + write(buf) + ... + fun:ibv_exp_reg_mr +} +{ + ibv_exp_free_dm + Memcheck:Param + write(buf) + ... + fun:ibv_exp_free_dm +} { res_domain_leak Memcheck:Leak diff --git a/src/tools/perf/libperf.c b/src/tools/perf/libperf.c index a3d83d67d5ec..39b34cbf9c01 100644 --- a/src/tools/perf/libperf.c +++ b/src/tools/perf/libperf.c @@ -950,7 +950,7 @@ static ucs_status_t ucp_perf_test_exchange_status(ucx_perf_context_t *perf, ucs_status_t status) { unsigned group_size = rte_call(perf, group_size); - ucs_status_t collective_status = UCS_OK; + ucs_status_t collective_status = status; struct iovec vec; void *req = NULL; unsigned i; @@ -1218,6 +1218,8 @@ static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf, ucx_perf_params_t * } status = uct_perf_test_check_capabilities(params, perf->uct.iface); + /* sync status across all processes */ + status = ucp_perf_test_exchange_status(perf, status); if (status != UCS_OK) { goto out_iface_close; } diff --git a/src/ucs/config/parser.c b/src/ucs/config/parser.c index 7d36c4f3a62a..b878799a2093 100644 --- a/src/ucs/config/parser.c +++ b/src/ucs/config/parser.c @@ -1339,10 +1339,9 @@ UCS_STATIC_INIT { } UCS_STATIC_CLEANUP { - int UCS_V_UNUSED dummy; const char *key; - kh_foreach(&ucs_config_parser_env_vars, key, dummy, { + kh_foreach_key(&ucs_config_parser_env_vars, key, { ucs_free((void*)key); }) kh_destroy_inplace(ucs_config_env_vars, &ucs_config_parser_env_vars); diff --git a/src/ucs/datastruct/khash.h b/src/ucs/datastruct/khash.h index dcc923770787..b4fb3da44deb 100644 --- a/src/ucs/datastruct/khash.h +++ b/src/ucs/datastruct/khash.h @@ -591,6 +591,19 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) code; \ } } +/*! @function + @abstract Iterate over the keys in the hash table + @param h Pointer to the hash table [khash_t(name)*] + @param kvar Variable to which key will be assigned + @param code Block of code to execute + */ +#define kh_foreach_key(h, kvar, code) { khint_t __i; \ + for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \ + if (!kh_exist(h,__i)) continue; \ + (kvar) = kh_key(h,__i); \ + code; \ + } } + /*! @function @abstract Iterate over the values in the hash table @param h Pointer to the hash table [khash_t(name)*] diff --git a/src/uct/ib/dc/accel/dc_mlx5.c b/src/uct/ib/dc/accel/dc_mlx5.c index ff8883d9e13a..071c6b391a0a 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.c +++ b/src/uct/ib/dc/accel/dc_mlx5.c @@ -120,11 +120,22 @@ static void uct_dc_mlx5_ep_destroy(uct_ep_h tl_ep) static ucs_status_t uct_dc_mlx5_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *iface_attr) { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t); + size_t max_am_inline = UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE); + size_t max_put_inline = UCT_IB_MLX5_PUT_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE); ucs_status_t status; +#if HAVE_IBV_EXP_DM + if (iface->mlx5_common.dm.dm != NULL) { + max_am_inline = ucs_max(iface->mlx5_common.dm.dm->seg_len, + UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)); + max_put_inline = ucs_max(iface->mlx5_common.dm.dm->seg_len, + UCT_IB_MLX5_PUT_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)); + } +#endif + status = uct_dc_iface_query(&iface->super, iface_attr, - UCT_IB_MLX5_PUT_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE), - UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE), + max_put_inline, + max_am_inline, UCT_IB_MLX5_AM_ZCOPY_MAX_HDR(UCT_IB_MLX5_AV_FULL_SIZE), UCT_IB_MLX5_AM_ZCOPY_MAX_IOV); if (status != UCS_OK) { @@ -152,19 +163,21 @@ uct_dc_mlx5_iface_bcopy_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep, unsigned opcode, unsigned length, /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey, uct_rc_iface_send_desc_t *desc, uint8_t send_flags, - uint32_t imm_val_be) + uint32_t imm_val_be, const void *buffer, + uct_ib_log_sge_t *log_sge) { UCT_DC_MLX5_TXQP_DECL(txqp, txwq); UCT_DC_MLX5_IFACE_TXQP_GET(iface, &ep->super, txqp, txwq); desc->super.sn = txwq->sw_pi; uct_rc_mlx5_txqp_dptr_post(&iface->super.super, IBV_EXP_QPT_DC_INI, txqp, txwq, - opcode, desc + 1, length, &desc->lkey, + opcode, buffer, length, &desc->lkey, rdma_raddr, uct_ib_md_direct_rkey(rdma_rkey), 0, 0, 0, &ep->av, uct_dc_mlx5_ep_get_grh(ep), uct_ib_mlx5_wqe_av_size(&ep->av), - MLX5_WQE_CTRL_CQ_UPDATE | send_flags, imm_val_be, INT_MAX); + MLX5_WQE_CTRL_CQ_UPDATE | send_flags, imm_val_be, INT_MAX, + log_sge); uct_rc_txqp_add_send_op(txqp, &desc->super); } @@ -215,7 +228,7 @@ uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep, compare_mask, compare, swap_add, &ep->av, uct_dc_mlx5_ep_get_grh(ep), uct_ib_mlx5_wqe_av_size(&ep->av), - MLX5_WQE_CTRL_CQ_UPDATE, 0, INT_MAX); + MLX5_WQE_CTRL_CQ_UPDATE, 0, INT_MAX, NULL); UCT_TL_EP_STAT_ATOMIC(&ep->super.super); uct_rc_txqp_add_send_op(txqp, &desc->super); @@ -339,8 +352,9 @@ ucs_status_t uct_dc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep, uint32_t compare, uin htonl(compare), htonl(swap), comp); } -ucs_status_t uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, - const void *buffer, unsigned length) +static ucs_status_t UCS_F_ALWAYS_INLINE +uct_dc_mlx5_ep_am_short_inline(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, + const void *buffer, unsigned length) { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); @@ -365,6 +379,72 @@ ucs_status_t uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, return UCS_OK; } +#if HAVE_IBV_EXP_DM +static ucs_status_t UCS_F_ALWAYS_INLINE +uct_dc_mlx5_ep_short_dm(uct_dc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache, + size_t hdr_len, const void *payload, unsigned length, + unsigned opcode, uint8_t fm_ce_se, + uint64_t rdma_raddr, uct_rkey_t rdma_rkey) +{ + uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.super.iface, uct_dc_mlx5_iface_t); + uct_rc_iface_send_desc_t *desc; + void *buffer; + ucs_status_t status; + uct_ib_log_sge_t log_sge; + + status = uct_rc_mlx5_common_dm_make_data(&iface->mlx5_common, &iface->super.super, + cache, hdr_len, payload, length, &desc, + &buffer, &log_sge); + if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) { + return status; + } + + uct_dc_mlx5_iface_bcopy_post(iface, ep, opcode, + hdr_len + length, + rdma_raddr, rdma_rkey, + desc, fm_ce_se, 0, buffer, + log_sge.num_sge ? &log_sge : NULL); + return UCS_OK; +} +#endif + +ucs_status_t uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, + const void *buffer, unsigned length) +{ +#if HAVE_IBV_EXP_DM + uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); + uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); + ucs_status_t status; + uct_rc_mlx5_dm_copy_data_t cache; + + if (ucs_likely((sizeof(uct_rc_am_short_hdr_t) + length <= + UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) || + !iface->mlx5_common.dm.dm)) { +#endif + return uct_dc_mlx5_ep_am_short_inline(tl_ep, id, hdr, buffer, length); +#if HAVE_IBV_EXP_DM + } + + UCT_CHECK_LENGTH(length + sizeof(uct_rc_am_short_hdr_t), 0, + iface->mlx5_common.dm.seg_len, "am_short"); + UCT_DC_CHECK_RES_AND_FC(&iface->super, &ep->super); + + uct_rc_am_hdr_fill(&cache.am_hdr.rc_hdr, id); + cache.am_hdr.am_hdr = hdr; + + status = uct_dc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.am_hdr), buffer, length, + MLX5_OPCODE_SEND, + MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE, + 0, 0); + if (UCS_STATUS_IS_ERR(status)) { + return status; + } + UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(cache.am_hdr) + length); + UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->super.fc); + return UCS_OK; +#endif +} + ssize_t uct_dc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id, uct_pack_callback_t pack_cb, void *arg, unsigned flags) @@ -380,7 +460,7 @@ ssize_t uct_dc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id, uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_SEND, sizeof(uct_rc_hdr_t) + length, 0, 0, desc, - MLX5_WQE_CTRL_SOLICITED, 0); + MLX5_WQE_CTRL_SOLICITED, 0, desc + 1, NULL); UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->super.fc); UCT_TL_EP_STAT_OP(&ep->super.super, AM, BCOPY, length); @@ -415,9 +495,10 @@ ucs_status_t uct_dc_mlx5_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *hea } -ucs_status_t uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *buffer, - unsigned length, uint64_t remote_addr, - uct_rkey_t rkey) +static ucs_status_t UCS_F_ALWAYS_INLINE +uct_dc_mlx5_ep_put_short_inline(uct_ep_h tl_ep, const void *buffer, + unsigned length, uint64_t remote_addr, + uct_rkey_t rkey) { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); @@ -440,6 +521,36 @@ ucs_status_t uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *buffer, return UCS_OK; } +ucs_status_t uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *payload, + unsigned length, uint64_t remote_addr, + uct_rkey_t rkey) +{ +#if HAVE_IBV_EXP_DM + uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); + uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); + ucs_status_t status; + + if (ucs_likely((length <= UCT_IB_MLX5_PUT_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) || + !iface->mlx5_common.dm.dm)) { +#endif + return uct_dc_mlx5_ep_put_short_inline(tl_ep, payload, length, remote_addr, rkey); +#if HAVE_IBV_EXP_DM + } + + UCT_CHECK_LENGTH(length, 0, iface->mlx5_common.dm.seg_len, "put_short"); + UCT_DC_CHECK_RES(&iface->super, &ep->super); + status = uct_dc_mlx5_ep_short_dm(ep, NULL, 0, payload, length, + MLX5_OPCODE_RDMA_WRITE, + MLX5_WQE_CTRL_CQ_UPDATE, + remote_addr, rkey); + if (UCS_STATUS_IS_ERR(status)) { + return status; + } + UCT_TL_EP_STAT_OP(&ep->super.super, PUT, SHORT, length); + return UCS_OK; +#endif +} + ssize_t uct_dc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb, void *arg, uint64_t remote_addr, uct_rkey_t rkey) { @@ -452,7 +563,7 @@ ssize_t uct_dc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb, UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(&iface->super.super, &iface->super.super.tx.mp, desc, pack_cb, arg, length); uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_RDMA_WRITE, length, - remote_addr, rkey, desc, 0, 0); + remote_addr, rkey, desc, 0, 0, desc + 1, NULL); UCT_TL_EP_STAT_OP(&ep->super.super, PUT, BCOPY, length); return length; } @@ -494,7 +605,7 @@ ucs_status_t uct_dc_mlx5_ep_get_bcopy(uct_ep_h tl_ep, UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(&iface->super.super, &iface->super.super.tx.mp, desc, unpack_cb, comp, arg, length); uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_RDMA_READ, length, - remote_addr, rkey, desc, 0, 0); + remote_addr, rkey, desc, 0, 0, desc + 1, NULL); UCT_TL_EP_STAT_OP(&ep->super.super, GET, BCOPY, length); return UCS_INPROGRESS; } @@ -593,8 +704,9 @@ static unsigned uct_dc_mlx5_iface_progress(void *arg) #if IBV_EXP_HW_TM_DC -ucs_status_t uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag, - const void *data, size_t length) +static ucs_status_t UCS_F_ALWAYS_INLINE +uct_dc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep, uct_tag_t tag, + const void *data, size_t length) { uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); @@ -617,6 +729,35 @@ ucs_status_t uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag, return UCS_OK; } +ucs_status_t uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag, + const void *data, size_t length) +{ +#if HAVE_IBV_EXP_DM + uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t); + uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t); + uct_rc_mlx5_dm_copy_data_t cache; + + if (ucs_likely((sizeof(struct ibv_exp_tmh) + length <= + UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) || + !iface->mlx5_common.dm.dm)) { +#endif + return uct_dc_mlx5_ep_tag_eager_short_inline(tl_ep, tag, data, length); +#if HAVE_IBV_EXP_DM + } + + UCT_CHECK_LENGTH(length + sizeof(struct ibv_exp_tmh), 0, + iface->mlx5_common.dm.seg_len, "tag_short"); + UCT_DC_CHECK_RES(&iface->super, &ep->super); + + uct_rc_iface_fill_tmh(ucs_unaligned_ptr(&cache.tm_hdr), tag, 0, IBV_EXP_TMH_EAGER); + + return uct_dc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.tm_hdr), data, length, + MLX5_OPCODE_SEND, + MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE, + 0, 0); +#endif +} + ssize_t uct_dc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag, uint64_t imm, uct_pack_callback_t pack_cb, @@ -638,7 +779,7 @@ ssize_t uct_dc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag, uct_dc_mlx5_iface_bcopy_post(iface, ep, opcode, sizeof(struct ibv_exp_tmh) + length, - 0, 0, desc, MLX5_WQE_CTRL_SOLICITED, ib_imm); + 0, 0, desc, MLX5_WQE_CTRL_SOLICITED, ib_imm, desc + 1, NULL); return length; } diff --git a/src/uct/ib/mlx5/ib_mlx5_log.c b/src/uct/ib/mlx5/ib_mlx5_log.c index c7e90e51f07c..007854a6c32d 100644 --- a/src/uct/ib/mlx5/ib_mlx5_log.c +++ b/src/uct/ib/mlx5/ib_mlx5_log.c @@ -192,7 +192,7 @@ static size_t uct_ib_mlx5_dump_dgram(char *buf, size_t max, void *seg) static void uct_ib_mlx5_wqe_dump(uct_ib_iface_t *iface, enum ibv_qp_type qp_type, void *wqe, void *qstart, void *qend, int max_sge, uct_log_data_dump_func_t packet_dump_cb, - char *buffer, size_t max) + char *buffer, size_t max, uct_ib_log_sge_t *log_sge) { static uct_ib_opcode_t opcodes[] = { [MLX5_OPCODE_NOP] = { "NOP", 0 }, @@ -314,30 +314,35 @@ static void uct_ib_mlx5_wqe_dump(uct_ib_iface_t *iface, enum ibv_qp_type qp_type } /* Data segments*/ - i = 0; - inline_bitmap = 0; - - while ((ds > 0) && (i < sizeof(sg_list) / sizeof(sg_list[0]))) { - ds -= uct_ib_mlx5_parse_dseg(&seg, qstart, qend, sg_list, &i, &is_inline); - if (is_inline) { - inline_bitmap |= UCS_BIT(i-1); + if (log_sge == NULL) { + i = 0; + inline_bitmap = 0; + + while ((ds > 0) && (i < sizeof(sg_list) / sizeof(sg_list[0]))) { + ds -= uct_ib_mlx5_parse_dseg(&seg, qstart, qend, sg_list, &i, &is_inline); + if (is_inline) { + inline_bitmap |= UCS_BIT(i-1); + } + s += strlen(s); } - s += strlen(s); } - uct_ib_log_dump_sg_list(iface, UCT_AM_TRACE_TYPE_SEND, sg_list, - ucs_min(i, max_sge), - inline_bitmap, packet_dump_cb, s, ends - s); + uct_ib_log_dump_sg_list(iface, UCT_AM_TRACE_TYPE_SEND, + log_sge ? log_sge->sg_list : sg_list, + log_sge ? log_sge->num_sge : ucs_min(i, max_sge), + log_sge ? log_sge->inline_bitmap : inline_bitmap, + packet_dump_cb, s, ends - s); } void __uct_ib_mlx5_log_tx(const char *file, int line, const char *function, uct_ib_iface_t *iface, enum ibv_qp_type qp_type, void *wqe, void *qstart, void *qend, int max_sge, + uct_ib_log_sge_t *log_sge, uct_log_data_dump_func_t packet_dump_cb) { char buf[256] = {0}; uct_ib_mlx5_wqe_dump(iface, qp_type, wqe, qstart, qend, max_sge, packet_dump_cb, - buf, sizeof(buf) - 1); + buf, sizeof(buf) - 1, log_sge); uct_log_data(file, line, function, buf); } diff --git a/src/uct/ib/mlx5/ib_mlx5_log.h b/src/uct/ib/mlx5/ib_mlx5_log.h index 4c7d68b16998..7a5429c283c5 100644 --- a/src/uct/ib/mlx5/ib_mlx5_log.h +++ b/src/uct/ib/mlx5/ib_mlx5_log.h @@ -12,6 +12,12 @@ #include +typedef struct uct_ib_log_sge { + int num_sge; + uint64_t inline_bitmap; + struct ibv_sge sg_list[2]; +} uct_ib_log_sge_t; + ucs_status_t uct_ib_mlx5_completion_with_err(struct mlx5_err_cqe *ecqe, ucs_log_level_t log_level); @@ -19,6 +25,7 @@ ucs_status_t uct_ib_mlx5_completion_with_err(struct mlx5_err_cqe *ecqe, void __uct_ib_mlx5_log_tx(const char *file, int line, const char *function, uct_ib_iface_t *iface, enum ibv_qp_type qp_type, void *wqe, void *qstart, void *qend, int max_log_sge, + uct_ib_log_sge_t *log_sge, uct_log_data_dump_func_t packet_dump_cb); void __uct_ib_mlx5_log_rx(const char *file, int line, const char *function, @@ -29,10 +36,10 @@ void __uct_ib_mlx5_log_rx(const char *file, int line, const char *function, void uct_ib_mlx5_cqe_dump(const char *file, int line, const char *function, struct mlx5_cqe64 *cqe); -#define uct_ib_mlx5_log_tx(_iface, _qpt, _wqe, _qstart, _qend, _max_sge, _dump_cb) \ +#define uct_ib_mlx5_log_tx(_iface, _qpt, _wqe, _qstart, _qend, _max_sge, _log_sge, _dump_cb) \ if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { \ __uct_ib_mlx5_log_tx(__FILE__, __LINE__, __FUNCTION__, \ - _iface, _qpt, _wqe, _qstart, _qend, _max_sge, _dump_cb); \ + _iface, _qpt, _wqe, _qstart, _qend, _max_sge, _log_sge, _dump_cb); \ } #define uct_ib_mlx5_log_rx(_iface, _qpt, _cqe, _data, _dump_cb) \ diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.c b/src/uct/ib/rc/accel/rc_mlx5_common.c index b66fac77b9f0..5652366aa587 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.c +++ b/src/uct/ib/rc/accel/rc_mlx5_common.c @@ -25,12 +25,10 @@ ucs_stats_class_t uct_rc_mlx5_iface_stats_class = { ucs_config_field_t uct_mlx5_common_config_table[] = { #if HAVE_IBV_EXP_DM - /* TODO: set 1k limit */ - {"DM_SIZE", "0", + {"DM_SIZE", "2k", "Device Memory segment size (0 - disabled)", ucs_offsetof(uct_common_mlx5_iface_config_t, dm.seg_len), UCS_CONFIG_TYPE_MEMUNITS}, - /* TODO: set 1 buffer limit */ - {"DM_COUNT", "0", + {"DM_COUNT", "1", "Device Memory segments count (0 - disabled)", ucs_offsetof(uct_common_mlx5_iface_config_t, dm.count), UCS_CONFIG_TYPE_UINT}, #endif @@ -287,8 +285,9 @@ uct_rc_mlx5_iface_common_dm_tl_init(uct_mlx5_dm_data_t *data, struct ibv_exp_alloc_dm_attr dm_attr; struct ibv_exp_reg_mr_in mr_in; - data->seg_len = ucs_align_up(config->dm.seg_len, - sizeof(uct_rc_mlx5_dm_copy_data_t)); + data->seg_len = ucs_min(ucs_align_up(config->dm.seg_len, + sizeof(uct_rc_mlx5_dm_copy_data_t)), + iface->super.config.seg_size); data->seg_count = config->dm.count; data->seg_attached = 0; data->device = uct_ib_iface_device(&iface->super); diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.h b/src/uct/ib/rc/accel/rc_mlx5_common.h index 357c384e9455..f927f0e9e817 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.h +++ b/src/uct/ib/rc/accel/rc_mlx5_common.h @@ -426,7 +426,8 @@ uct_rc_mlx5_common_post_send(uct_rc_iface_t *iface, enum ibv_qp_type qp_type, uct_rc_txqp_t *txqp, uct_ib_mlx5_txwq_t *txwq, uint8_t opcode, uint8_t opmod, uint8_t fm_ce_se, size_t wqe_size, uct_ib_mlx5_base_av_t *av, - struct mlx5_grh_av *grh_av, uint32_t imm, int max_log_sge) + struct mlx5_grh_av *grh_av, uint32_t imm, int max_log_sge, + uct_ib_log_sge_t *log_sge) { struct mlx5_wqe_ctrl_seg *ctrl; uint16_t posted; @@ -446,7 +447,8 @@ uct_rc_mlx5_common_post_send(uct_rc_iface_t *iface, enum ibv_qp_type qp_type, uct_ib_mlx5_set_dgram_seg((void*)(ctrl + 1), av, grh_av, qp_type); } - uct_ib_mlx5_log_tx(&iface->super, qp_type, ctrl, txwq->qstart, txwq->qend, max_log_sge, + uct_ib_mlx5_log_tx(&iface->super, qp_type, ctrl, txwq->qstart, + txwq->qend, max_log_sge, log_sge, ((opcode == MLX5_OPCODE_SEND) || (opcode == MLX5_OPCODE_SEND_IMM)) ? uct_rc_ep_am_packet_dump : NULL); @@ -549,7 +551,7 @@ uct_rc_mlx5_txqp_inline_post(uct_rc_iface_t *iface, enum ibv_qp_type qp_type, } uct_rc_mlx5_common_post_send(iface, qp_type, txqp, txwq, opcode, 0, fm_ce_se, - wqe_size, av, grh_av, imm_val_be, max_log_sge); + wqe_size, av, grh_av, imm_val_be, max_log_sge, NULL); } /* @@ -579,7 +581,7 @@ uct_rc_mlx5_txqp_dptr_post(uct_rc_iface_t *iface, enum ibv_qp_type qp_type, /* ATOMIC */ uint64_t compare_mask, uint64_t compare, uint64_t swap_add, /* AV */ uct_ib_mlx5_base_av_t *av, struct mlx5_grh_av *grh_av, size_t av_size, uint8_t fm_ce_se, uint32_t imm_val_be, - int max_log_sge) + int max_log_sge, uct_ib_log_sge_t *log_sge) { struct mlx5_wqe_ctrl_seg *ctrl; struct mlx5_wqe_raddr_seg *raddr; @@ -709,7 +711,8 @@ uct_rc_mlx5_txqp_dptr_post(uct_rc_iface_t *iface, enum ibv_qp_type qp_type, uct_rc_mlx5_common_post_send(iface, qp_type, txqp, txwq, (opcode_flags & UCT_RC_MLX5_OPCODE_MASK), opmod, - fm_ce_se, wqe_size, av, grh_av, imm_val_be, max_log_sge); + fm_ce_se, wqe_size, av, grh_av, imm_val_be, + max_log_sge, log_sge); } static UCS_F_ALWAYS_INLINE @@ -801,7 +804,7 @@ void uct_rc_mlx5_txqp_dptr_post_iov(uct_rc_iface_t *iface, enum ibv_qp_type qp_t uct_rc_mlx5_common_post_send(iface, qp_type, txqp, txwq, opcode_flags & UCT_RC_MLX5_OPCODE_MASK, 0, fm_ce_se, wqe_size, av, grh_av, ib_imm_be, - max_log_sge); + max_log_sge, NULL); } #if IBV_EXP_HW_TM @@ -927,7 +930,7 @@ uct_rc_mlx5_txqp_tag_inline_post(uct_rc_iface_t *iface, enum ibv_qp_type qp_type fm_ce_se |= uct_rc_iface_tx_moderation(iface, txqp, MLX5_WQE_CTRL_CQ_UPDATE); uct_rc_mlx5_common_post_send(iface, qp_type, txqp, txwq, opcode, 0, fm_ce_se, - wqe_size, av, grh_av, imm_val_be, INT_MAX); + wqe_size, av, grh_av, imm_val_be, INT_MAX, NULL); } static UCS_F_ALWAYS_INLINE void @@ -1297,4 +1300,107 @@ uct_rc_mlx5_iface_common_poll_rx(uct_rc_mlx5_iface_common_t *mlx5_common_iface, return count; } +#if HAVE_IBV_EXP_DM +/* DM memory should be written by 8 bytes (int64) to eliminate + * processor cache issues. To make this used uct_rc_mlx5_dm_copy_data_t + * datatype where first hdr_len bytes are filled by message header + * and tail is filled by head of message. */ +static void UCS_F_ALWAYS_INLINE +uct_rc_mlx5_iface_common_copy_to_dm(uct_rc_mlx5_dm_copy_data_t *cache, size_t hdr_len, + const void *payload, size_t length, void *dm, + uct_ib_log_sge_t *log_sge) +{ + size_t head = (cache && hdr_len) ? ucs_min(length, sizeof(*cache) - hdr_len) : 0; + size_t body = ucs_align_down(length - head, sizeof(uint64_t)); + size_t tail = length - (head + body); + uint64_t *dst = dm; + uint64_t padding = 0; /* init by 0 to suppress valgrind error */ + int i = 0; + + ucs_assert(sizeof(*cache) >= hdr_len); + ucs_assert(head + body + tail == length); + ucs_assert(tail < sizeof(uint64_t)); + + /* copy head of payload to tail of cache */ + memcpy(cache->in + hdr_len, payload, head); + + UCS_STATIC_ASSERT(sizeof(*cache) == sizeof(cache->out)); + UCS_STATIC_ASSERT(sizeof(cache->in) == sizeof(cache->out)); + UCS_STATIC_ASSERT(sizeof(log_sge->sg_list) / sizeof(log_sge->sg_list[0]) >= 2); + + /* condition is static-evaluated */ + if (cache && hdr_len) { + /* atomically by 8 bytes copy data to DM */ + *(dst++) = cache->out[0]; + *(dst++) = cache->out[1]; + if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { + log_sge->sg_list[0].addr = (uint64_t)cache; + log_sge->sg_list[0].length = (uint32_t)hdr_len; + i++; + } + } + if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { + log_sge->sg_list[i].addr = (uint64_t)payload; + log_sge->sg_list[i].length = (uint32_t)length; + i++; + } + log_sge->num_sge = i; + + /* copy payload to DM */ + UCS_WORD_COPY(dst, payload + head, uint64_t, body); + if (tail) { + memcpy(&padding, payload + head + body, tail); + *(dst + (body / sizeof(uint64_t))) = padding; + } +} + +static ucs_status_t UCS_F_ALWAYS_INLINE +uct_rc_mlx5_common_dm_make_data(uct_rc_mlx5_iface_common_t *iface, + uct_rc_iface_t *rc_iface, + uct_rc_mlx5_dm_copy_data_t *cache, + size_t hdr_len, const void *payload, + unsigned length, + uct_rc_iface_send_desc_t **desc_p, + void **buffer_p, uct_ib_log_sge_t *log_sge) +{ + uct_rc_iface_send_desc_t *desc; + void *buffer; + + ucs_assert(iface->dm.dm != NULL); + ucs_assert(log_sge != NULL); + + desc = ucs_mpool_get_inline(&iface->dm.dm->mp); + if (ucs_unlikely(desc == NULL)) { + /* in case if no resources available - fallback to bcopy */ + UCT_RC_IFACE_GET_TX_DESC(rc_iface, &rc_iface->tx.mp, desc); + desc->super.handler = (uct_rc_send_handler_t)ucs_mpool_put; + buffer = desc + 1; + + /* condition is static-evaluated, no performance penalty */ + if (cache && hdr_len) { + memcpy(buffer, cache->out, hdr_len); + } + memcpy(UCS_PTR_BYTE_OFFSET(buffer, hdr_len), payload, length); + log_sge->num_sge = 0; + } else { + /* desc must be partially initialized by mpool. + * hint to valgrind to make it defined */ + VALGRIND_MAKE_MEM_DEFINED(desc, sizeof(*desc)); + ucs_assert(desc->super.buffer != NULL); + buffer = (void*)(desc->super.buffer - iface->dm.dm->start_va); + + uct_rc_mlx5_iface_common_copy_to_dm(cache, hdr_len, payload, + length, desc->super.buffer, log_sge); + if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { + log_sge->sg_list[0].lkey = log_sge->sg_list[1].lkey = desc->lkey; + log_sge->inline_bitmap = 0; + } + } + + *desc_p = desc; + *buffer_p = buffer; + return UCS_OK; +} +#endif + #endif diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index 47dfd6ae0b20..1eb1d581432a 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -24,14 +24,15 @@ uct_rc_mlx5_txqp_bcopy_post(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, uct_ib_m unsigned opcode, unsigned length, /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey, uint8_t fm_ce_se, uint32_t imm_val_be, - uct_rc_iface_send_desc_t *desc, const void *buffer) + uct_rc_iface_send_desc_t *desc, const void *buffer, + uct_ib_log_sge_t *log_sge) { desc->super.sn = txwq->sw_pi; uct_rc_mlx5_txqp_dptr_post(iface, IBV_QPT_RC, txqp, txwq, opcode, buffer, length, &desc->lkey, rdma_raddr, uct_ib_md_direct_rkey(rdma_rkey), 0, 0, 0, - NULL, NULL, 0, fm_ce_se, imm_val_be, INT_MAX); + NULL, NULL, 0, fm_ce_se, imm_val_be, INT_MAX, log_sge); uct_rc_txqp_add_send_op(txqp, &desc->super); } @@ -86,7 +87,7 @@ uct_rc_mlx5_ep_atomic_post(uct_rc_mlx5_ep_t *ep, unsigned opcode, opcode, desc + 1, length, &desc->lkey, remote_addr, ib_rkey, compare_mask, compare, swap_add, - NULL, NULL, 0, signal, 0, INT_MAX); + NULL, NULL, 0, signal, 0, INT_MAX, NULL); UCT_TL_EP_STAT_ATOMIC(&ep->super.super); uct_rc_txqp_add_send_op(&ep->super.txqp, &desc->super); @@ -175,44 +176,6 @@ uct_rc_mlx5_ep_am_short_inline(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, } #if HAVE_IBV_EXP_DM -/* DM memory should be written by 8 bytes (int64) to eliminate - * processor cache issues. To make this used uct_rc_mlx5_dm_copy_data_t - * datatype where first hdr_len bytes are filled by message header - * and tail is filled by head of message. */ -static void UCS_F_ALWAYS_INLINE -uct_rc_mlx5_ep_copy_to_dm(uct_rc_mlx5_dm_copy_data_t *cache, size_t hdr_len, - const void *payload, size_t length, void *dm) -{ - size_t head = (cache && hdr_len) ? ucs_min(length, sizeof(*cache) - hdr_len) : 0; - size_t body = ucs_align_down(length - head, sizeof(uint64_t)); - size_t tail = length - (head + body); - uint64_t *dst = dm; - uint64_t padding = 0; /* init by 0 to suppress valgrind error */ - - ucs_assert(sizeof(*cache) >= hdr_len); - ucs_assert(head + body + tail == length); - ucs_assert(tail < sizeof(uint64_t)); - - /* copy head of payload to tail of cache */ - memcpy(cache->in + hdr_len, payload, head); - - UCS_STATIC_ASSERT(sizeof(*cache) == sizeof(cache->out)); - UCS_STATIC_ASSERT(sizeof(cache->in) == sizeof(cache->out)); - - /* condition is static-evaluated */ - if (cache && hdr_len) { - /* atomically by 8 bytes copy data to DM */ - *(dst++) = cache->out[0]; - *(dst++) = cache->out[1]; - } - - UCS_WORD_COPY(dst, payload + head, uint64_t, body); - if (tail) { - memcpy(&padding, payload + head + body, tail); - *(dst + (body / sizeof(uint64_t))) = padding; - } -} - static ucs_status_t UCS_F_ALWAYS_INLINE uct_rc_mlx5_ep_short_dm(uct_rc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache, size_t hdr_len, const void *payload, unsigned length, @@ -223,31 +186,21 @@ uct_rc_mlx5_ep_short_dm(uct_rc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache, uct_rc_iface_t *rc_iface = &iface->super; uct_rc_iface_send_desc_t *desc; void *buffer; + ucs_status_t status; + uct_ib_log_sge_t log_sge; - desc = ucs_mpool_get_inline(&iface->mlx5_common.dm.dm->mp); - if (ucs_unlikely(desc == NULL)) { - /* in case if no resources available - fallback to bcopy */ - UCT_RC_IFACE_GET_TX_DESC(rc_iface, &rc_iface->tx.mp, desc); - desc->super.handler = (uct_rc_send_handler_t)ucs_mpool_put; - buffer = desc + 1; - - /* condition is static-evaluated, no performance penalty */ - if (cache && hdr_len) { - memcpy(desc + 1, cache->out, hdr_len); - } - memcpy(UCS_PTR_BYTE_OFFSET(desc + 1, hdr_len), payload, length); - } else { - ucs_assert(desc->super.buffer != NULL); - buffer = (void*)(desc->super.buffer - iface->mlx5_common.dm.dm->start_va); - - uct_rc_mlx5_ep_copy_to_dm(cache, hdr_len, payload, - length, desc->super.buffer); - + status = uct_rc_mlx5_common_dm_make_data(&iface->mlx5_common, &iface->super, + cache, hdr_len, payload, length, &desc, + &buffer, &log_sge); + if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) { + return status; } + uct_rc_mlx5_txqp_bcopy_post(rc_iface, &ep->super.txqp, &ep->tx.wq, opcode, hdr_len + length, rdma_raddr, rdma_rkey, fm_ce_se, - 0, desc, buffer); + 0, desc, buffer, + log_sge.num_sge ? &log_sge : NULL); return UCS_OK; } #endif @@ -298,7 +251,8 @@ ssize_t uct_rc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb, uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq, MLX5_OPCODE_RDMA_WRITE, length, remote_addr, - rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1); + rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1, + NULL); UCT_TL_EP_STAT_OP(&ep->super.super, PUT, BCOPY, length); return length; } @@ -341,7 +295,8 @@ ucs_status_t uct_rc_mlx5_ep_get_bcopy(uct_ep_h tl_ep, uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq, MLX5_OPCODE_RDMA_READ, length, remote_addr, - rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1); + rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1, + NULL); UCT_TL_EP_STAT_OP(&ep->super.super, GET, BCOPY, length); return UCS_INPROGRESS; } @@ -426,7 +381,8 @@ ssize_t uct_rc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id, uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq, MLX5_OPCODE_SEND, sizeof(uct_rc_hdr_t) + length, - 0, 0, MLX5_WQE_CTRL_SOLICITED, 0, desc, desc + 1); + 0, 0, MLX5_WQE_CTRL_SOLICITED, 0, desc, desc + 1, + NULL); UCT_TL_EP_STAT_OP(&ep->super.super, AM, BCOPY, length); UCT_RC_UPDATE_FC(iface, &ep->super, id); return length; @@ -615,7 +571,7 @@ ucs_status_t uct_rc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag, uct_rc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t); uct_rc_mlx5_dm_copy_data_t cache; - if (ucs_likely((sizeof(uct_rc_am_short_hdr_t) + length <= UCT_IB_MLX5_AM_MAX_SHORT(0)) || + if (ucs_likely((sizeof(struct ibv_exp_tmh) + length <= UCT_IB_MLX5_AM_MAX_SHORT(0)) || !iface->mlx5_common.dm.dm)) { #endif return uct_rc_mlx5_ep_tag_eager_short_inline(tl_ep, tag, data, length); @@ -658,7 +614,7 @@ ssize_t uct_rc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag, uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq, opcode, sizeof(struct ibv_exp_tmh) + length, 0, 0, MLX5_WQE_CTRL_SOLICITED, ib_imm, - desc, desc + 1); + desc, desc + 1, NULL); return length; } diff --git a/src/uct/ib/rc/base/rc_iface.h b/src/uct/ib/rc/base/rc_iface.h index b57e0a5052e3..23eb3af48c6a 100644 --- a/src/uct/ib/rc/base/rc_iface.h +++ b/src/uct/ib/rc/base/rc_iface.h @@ -55,8 +55,8 @@ #define UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(_iface, _mp, _desc, _pack_cb, _arg, _length) \ UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \ - desc->super.handler = (uct_rc_send_handler_t)ucs_mpool_put; \ - _length = pack_cb(_desc + 1, _arg); \ + (_desc)->super.handler = (uct_rc_send_handler_t)ucs_mpool_put; \ + _length = _pack_cb(_desc + 1, _arg); \ UCT_SKIP_ZERO_LENGTH(_length, _desc); #define UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(_iface, _mp, _desc, _unpack_cb, _comp, _arg, _length) \ diff --git a/src/uct/ib/ud/accel/ud_mlx5.c b/src/uct/ib/ud/accel/ud_mlx5.c index a4701995c336..4134e12ce19b 100644 --- a/src/uct/ib/ud/accel/ud_mlx5.c +++ b/src/uct/ib/ud/accel/ud_mlx5.c @@ -59,7 +59,7 @@ uct_ud_mlx5_post_send(uct_ud_mlx5_iface_t *iface, uct_ud_mlx5_ep_t *ep, uct_ib_mlx5_log_tx(&iface->super.super, IBV_QPT_UD, ctrl, iface->tx.wq.qstart, iface->tx.wq.qend, - max_log_sge, uct_ud_dump_packet); + max_log_sge, NULL, uct_ud_dump_packet); iface->super.tx.available -= uct_ib_mlx5_post_send(&iface->tx.wq, ctrl, wqe_size); ucs_assert((int16_t)iface->tx.wq.bb_max >= iface->super.tx.available); diff --git a/test/gtest/uct/test_p2p_err.cc b/test/gtest/uct/test_p2p_err.cc index 602683656044..d8819a4dc8a1 100644 --- a/test/gtest/uct/test_p2p_err.cc +++ b/test/gtest/uct/test_p2p_err.cc @@ -26,7 +26,7 @@ class uct_p2p_err_test : public uct_p2p_test { }; uct_p2p_err_test() : - uct_p2p_test(0, uct_error_handler_t(ucs_empty_function_return_success)) { + uct_p2p_test(0, error_handler) { } static size_t pack_cb(void *dest, void *arg) @@ -130,6 +130,20 @@ class uct_p2p_err_test : public uct_p2p_test { static ucs_status_t last_error; +private: + static ucs_status_t + error_handler(void *arg, uct_ep_h ep, ucs_status_t status) { + uct_p2p_err_test *self = static_cast(arg); + const p2p_resource *r = dynamic_cast(self->GetParam()); + ucs_assert_always(r != NULL); + if (r->loopback) { + /* In loop back IB TLs can generate QP flush error before remote + * access error. */ + ucs_log(UCS_LOG_LEVEL_ERROR, "Error on ep %p with status %s is handled", + ep, ucs_status_string(status)); + } + return UCS_OK; + } }; ucs_status_t uct_p2p_err_test::last_error = UCS_OK; diff --git a/test/gtest/uct/uct_test.cc b/test/gtest/uct/uct_test.cc index b57d50d1a685..290053a3f4aa 100644 --- a/test/gtest/uct/uct_test.cc +++ b/test/gtest/uct/uct_test.cc @@ -284,6 +284,7 @@ uct_test::entity* uct_test::create_entity(size_t rx_headroom, iface_params.rx_headroom = rx_headroom; iface_params.open_mode = UCT_IFACE_OPEN_MODE_DEVICE; iface_params.err_handler = err_handler; + iface_params.err_handler_arg = this; entity *new_ent = new entity(*GetParam(), m_iface_config, &iface_params, m_md_config); return new_ent;