diff --git a/src/uct/ib/dc/accel/dc_mlx5.c b/src/uct/ib/dc/accel/dc_mlx5.c index 1ec2b3c59db..beb1e6eca97 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.c +++ b/src/uct/ib/dc/accel/dc_mlx5.c @@ -484,6 +484,9 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface) ucs_memory_cpu_load_fence(); + ucs_assertv(!(cqe->op_own & (MLX5_INLINE_SCATTER_32|MLX5_INLINE_SCATTER_64)), + "tx inline scatter not supported"); + qp_num = ntohl(cqe->sop_drop_qpn) & UCS_MASK(UCT_IB_QPN_ORDER); dci = uct_dc_iface_dci_find(&iface->super, qp_num); txqp = &iface->super.tx.dcis[dci].txqp; @@ -571,54 +574,61 @@ static uct_rc_iface_ops_t uct_dc_mlx5_iface_ops = { }; +static ucs_status_t uct_dc_mlx5_iface_init_dcis(uct_dc_mlx5_iface_t *iface) +{ + ucs_status_t status; + uint16_t bb_max; + int i; + + bb_max = 0; + for (i = 0; i < iface->super.tx.ndci; i++) { + status = uct_ib_mlx5_get_txwq(iface->super.super.super.super.worker, + iface->super.tx.dcis[i].txqp.qp, + &iface->dci_wqs[i]); + if (status != UCS_OK) { + return status; + } + + + bb_max = iface->dci_wqs[i].bb_max; + uct_rc_txqp_available_set(&iface->super.tx.dcis[i].txqp, bb_max); + } + + iface->super.super.config.tx_qp_len = bb_max; + return UCS_OK; +} + static UCS_CLASS_INIT_FUNC(uct_dc_mlx5_iface_t, uct_md_h md, uct_worker_h worker, const char *dev_name, size_t rx_headroom, const uct_iface_config_t *tl_config) { - ucs_status_t status; uct_dc_iface_config_t *config = ucs_derived_of(tl_config, uct_dc_iface_config_t); - int i; + ucs_status_t status; ucs_trace_func(""); UCS_CLASS_CALL_SUPER_INIT(uct_dc_iface_t, &uct_dc_mlx5_iface_ops, md, worker, dev_name, rx_headroom, 0, config); - status = uct_rc_mlx5_iface_common_init(&self->mlx5_common, &self->super.super, &config->super.super); if (status != UCS_OK) { - return status; + goto err; } - self->dci_wqs = ucs_malloc(self->super.tx.ndci * sizeof(uct_ib_mlx5_txwq_t), "dc"); - if (self->dci_wqs == NULL) { - uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common); - return UCS_ERR_NO_MEMORY; + status = uct_dc_mlx5_iface_init_dcis(self); + if (status != UCS_OK) { + goto err_common_cleanup; } - for (i = 0; i < self->super.tx.ndci; i++) { - status = uct_ib_mlx5_get_txwq(self->super.super.super.super.worker, - self->super.tx.dcis[i].txqp.qp, - &self->dci_wqs[i]); - if (status != UCS_OK) { - uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common); - ucs_free(self->dci_wqs); - return status; - } - uct_rc_txqp_available_set(&self->super.tx.dcis[i].txqp, - self->dci_wqs[i].bb_max); - } - /* convert tx_qp_len to wq bulding blocks */ - self->super.super.config.tx_qp_len = self->dci_wqs[0].bb_max; - /* disable tx moderation for now */ - self->super.super.config.tx_moderation = 0; - - /** - * TODO: only register progress when we have a connection - */ + /* TODO: only register progress when we have a connection */ uct_worker_progress_register(worker, uct_dc_mlx5_iface_progress, self); - ucs_debug("created iface %p", self); + ucs_debug("created dc iface %p", self); + return UCS_OK; + +err_common_cleanup: + uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common); +err: return status; } @@ -628,7 +638,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_iface_t) uct_worker_progress_unregister(self->super.super.super.super.worker, uct_dc_mlx5_iface_progress, self); uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common); - ucs_free(self->dci_wqs); } UCS_CLASS_DEFINE(uct_dc_mlx5_iface_t, uct_dc_iface_t); diff --git a/src/uct/ib/dc/accel/dc_mlx5.h b/src/uct/ib/dc/accel/dc_mlx5.h index 8effb0346c6..41c3a44d3a4 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.h +++ b/src/uct/ib/dc/accel/dc_mlx5.h @@ -11,15 +11,18 @@ #include #include + typedef struct uct_dc_mlx5_iface { - uct_dc_iface_t super; - uct_rc_mlx5_iface_common_t mlx5_common; - uct_ib_mlx5_txwq_t *dci_wqs; + uct_dc_iface_t super; + uct_rc_mlx5_iface_common_t mlx5_common; + uct_ib_mlx5_txwq_t dci_wqs[UCT_DC_IFACE_MAX_DCIS]; } uct_dc_mlx5_iface_t; + typedef struct uct_dc_mlx5_ep { - uct_dc_ep_t super; - struct mlx5_wqe_av av; + uct_dc_ep_t super; + struct mlx5_wqe_av av; } uct_dc_mlx5_ep_t; + #endif diff --git a/src/uct/ib/dc/base/dc_ep.c b/src/uct/ib/dc/base/dc_ep.c index 34802484dd0..7f1c82d940e 100644 --- a/src/uct/ib/dc/base/dc_ep.c +++ b/src/uct/ib/dc/base/dc_ep.c @@ -50,11 +50,16 @@ ucs_status_t uct_dc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *r) * - dci is either assigned or can be assigned * - dci has resources */ - if (uct_rc_iface_has_tx_resources(&iface->super) && - ((ep->dci != UCT_DC_EP_NO_DCI) || uct_dc_iface_dci_can_alloc(iface)) && - uct_dc_iface_dci_ep_can_send(ep)) - { - return UCS_ERR_BUSY; + if (uct_rc_iface_has_tx_resources(&iface->super)) { + if (ep->dci == UCT_DC_EP_NO_DCI) { + if (uct_dc_iface_dci_can_alloc(iface)) { + return UCS_ERR_BUSY; + } + } else { + if (uct_dc_iface_dci_ep_can_send(ep)) { + return UCS_ERR_BUSY; + } + } } UCS_STATIC_ASSERT(sizeof(ucs_arbiter_elem_t) <= UCT_PENDING_REQ_PRIV_LEN); diff --git a/src/uct/ib/dc/base/dc_iface.c b/src/uct/ib/dc/base/dc_iface.c index 1814255c178..5f9cb82ef0f 100644 --- a/src/uct/ib/dc/base/dc_iface.c +++ b/src/uct/ib/dc/base/dc_iface.c @@ -12,7 +12,7 @@ const static char *uct_dc_tx_policy_names[] = { [UCT_DC_TX_POLICY_LAST] = NULL }; -static ucs_status_t uct_dc_iface_tgt_create(uct_dc_iface_t *iface) +static ucs_status_t uct_dc_iface_create_dct(uct_dc_iface_t *iface) { struct ibv_exp_dct_init_attr init_attr; @@ -100,53 +100,46 @@ static ucs_status_t uct_dc_iface_dci_connect(uct_dc_iface_t *iface, uct_rc_txqp_ return UCS_OK; } -static ucs_status_t uct_dc_iface_dcis_create(uct_dc_iface_t *iface, uct_dc_iface_config_t *config) +static void uct_dc_iface_dcis_destroy(uct_dc_iface_t *iface, int max) { - ucs_status_t status; int i; + for (i = 0; i < max; i++) { + uct_rc_txqp_cleanup(&iface->tx.dcis[i].txqp); + } +} + +static ucs_status_t uct_dc_iface_create_dcis(uct_dc_iface_t *iface, + uct_dc_iface_config_t *config) +{ struct ibv_qp_cap cap; + ucs_status_t status; + int i; ucs_debug("creating %d dci(s)", iface->tx.ndci); - iface->tx.dcis = ucs_malloc(iface->tx.ndci * sizeof(uct_dc_dci_t), "dc"); - if (iface->tx.dcis == NULL) { - return UCS_ERR_NO_MEMORY; - } - - iface->tx.dcis_stack = ucs_malloc(iface->tx.ndci * sizeof(uint8_t), "dc"); - if (iface->tx.dcis_stack == NULL) { - free(iface->tx.dcis); - return UCS_ERR_NO_MEMORY; - } iface->tx.stack_top = 0; - for (i = 0; i < iface->tx.ndci; i++) { status = uct_rc_txqp_init(&iface->tx.dcis[i].txqp, &iface->super, IBV_EXP_QPT_DC_INI, &cap UCS_STATS_ARG(iface->super.stats)); if (status != UCS_OK) { - goto create_err; + goto err; } status = uct_dc_iface_dci_connect(iface, &iface->tx.dcis[i].txqp); if (status != UCS_OK) { - goto create_err; + uct_rc_txqp_cleanup(&iface->tx.dcis[i].txqp); + goto err; } iface->tx.dcis_stack[i] = i; iface->tx.dcis[i].ep = NULL; } - config->max_inline = cap.max_inline_data; + return UCS_OK; -create_err: - for (;i >= 0; i--) { - if (iface->tx.dcis[i].txqp.qp) { - ibv_destroy_qp(iface->tx.dcis[i].txqp.qp); - } - } - ucs_free(iface->tx.dcis); - ucs_free(iface->tx.dcis_stack); +err: + uct_dc_iface_dcis_destroy(iface, i); return status; } @@ -160,38 +153,51 @@ UCS_CLASS_INIT_FUNC(uct_dc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md, UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t, ops, md, worker, dev_name, rx_headroom, rx_priv_len, &config->super.super); - self->tx.ndci = config->ndci; - self->tx.policy = config->tx_policy; - ucs_debug("using %s dci selection algorithm", - uct_dc_tx_policy_names[self->tx.policy]); + if (config->ndci < 1) { + ucs_error("dc interface must have at least 1 dci (requested: %d)", + config->ndci); + return UCS_ERR_INVALID_PARAM; + } + + if (config->ndci > UCT_DC_IFACE_MAX_DCIS) { + ucs_error("dc interface can have at most %d dcis (requested: %d)", + UCT_DC_IFACE_MAX_DCIS, config->ndci); + return UCS_ERR_INVALID_PARAM; + } + + self->tx.ndci = config->ndci; + self->tx.policy = config->tx_policy; + self->super.config.tx_moderation = 0; /* disable tx moderation for dcs */ + + ucs_debug("dc iface %p: using '%s' policy with %d dcis", self, + uct_dc_tx_policy_names[self->tx.policy], self->tx.ndci); + /* create DC target */ - status = uct_dc_iface_tgt_create(self); + status = uct_dc_iface_create_dct(self); if (status != UCS_OK) { - return status; + goto err; } /* create DC initiators */ - status = uct_dc_iface_dcis_create(self, config); + status = uct_dc_iface_create_dcis(self, config); if (status != UCS_OK) { - ibv_exp_destroy_dct(self->rx.dct); - return status; + goto err_destroy_dct; } + ucs_arbiter_init(&self->tx.dci_arbiter); return UCS_OK; + +err_destroy_dct: + ibv_exp_destroy_dct(self->rx.dct); +err: + return status; } static UCS_CLASS_CLEANUP_FUNC(uct_dc_iface_t) { - int i; - ucs_trace_func(""); ibv_exp_destroy_dct(self->rx.dct); - - for (i = 0; i < self->tx.ndci; i++) { - uct_rc_txqp_cleanup(&self->tx.dcis[i].txqp); - } - ucs_free(self->tx.dcis); - ucs_free(self->tx.dcis_stack); + uct_dc_iface_dcis_destroy(self, self->tx.ndci); ucs_arbiter_cleanup(&self->tx.dci_arbiter); } @@ -201,15 +207,16 @@ UCS_CLASS_DEFINE(uct_dc_iface_t, uct_rc_iface_t); ucs_config_field_t uct_dc_iface_config_table[] = { {"DC_", "", NULL, ucs_offsetof(uct_dc_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_rc_verbs_iface_config_table)}, + {"NUM_DCI", "8", - "number of QPs (dynamic connection initiators) allocated by the interface", + "Number of DC initiator QPs used by the interface (up to " UCS_PP_QUOTE(UCT_DC_IFACE_MAX_DCIS) ")", ucs_offsetof(uct_dc_iface_config_t, ndci), UCS_CONFIG_TYPE_UINT}, {"TX_POLICY", "dcs_quota", - "Specifies how dci is selected by the endpoint. The policies are:\n" + "Specifies how DC initiator (dci) is selected by the endpoint. The policies are:\n" "\n" "dcs the endpoint either uses already assigned dci or a dci is allocated in the LIFO order.\n" - " The dci is released once it has no outstanding operations.\n" + " The dci is released once it has no outstanding operations.\n" "\n" "dcs_quota same as dcs. In addition the dci is scheduled for release\n" " if it can not transmit and there are endpoints waiting for the dci allocation.\n" diff --git a/src/uct/ib/dc/base/dc_iface.h b/src/uct/ib/dc/base/dc_iface.h index 752fb39c6c4..8ba14bc62bc 100644 --- a/src/uct/ib/dc/base/dc_iface.h +++ b/src/uct/ib/dc/base/dc_iface.h @@ -6,17 +6,21 @@ #ifndef UCT_DC_IFACE_H #define UCT_DC_IFACE_H + #include #include - #include +#define UCT_DC_IFACE_MAX_DCIS 16 + +typedef struct uct_dc_ep uct_dc_ep_t; + + typedef struct uct_dc_iface_addr { uct_ib_uint24_t qp_num; uint8_t umr_id; } uct_dc_iface_addr_t; -typedef struct uct_dc_ep uct_dc_ep_t; typedef enum { UCT_DC_TX_POLICY_DCS, @@ -24,6 +28,7 @@ typedef enum { UCT_DC_TX_POLICY_LAST } uct_dc_tx_policty_t; + typedef struct uct_dc_iface_config { /* work around to do multiple inheritance: * dc_verbs needs both dc_iface_config and verbs_common_iface config @@ -31,28 +36,32 @@ typedef struct uct_dc_iface_config { uct_rc_verbs_iface_config_t super; int ndci; int tx_policy; - int max_inline; } uct_dc_iface_config_t; + typedef struct uct_dc_dci { - uct_rc_txqp_t txqp; /* DCI qp */ - uct_dc_ep_t *ep; /* points to an endpoint that currently own - the dci. Relevant only for dcs and dcs quota policies */ + uct_rc_txqp_t txqp; /* DCI qp */ + uct_dc_ep_t *ep; /* points to an endpoint that currently + owns the dci. Relevant only for dcs + and dcs quota policies. */ } uct_dc_dci_t; + typedef struct uct_dc_iface { - uct_rc_iface_t super; + uct_rc_iface_t super; struct { - uct_dc_dci_t *dcis; /* array of dcis. dcis[ndci] */ - /* LIFO is only relevant for dcs allocation policy */ - uint8_t *dcis_stack; /* LIFO of indexes of available dcis */ - uint8_t stack_top; /* dci stack top */ - uint8_t ndci; /* Number of DCIs */ - uint8_t policy; /* dci selection algorithm */ - ucs_arbiter_t dci_arbiter; + uct_dc_dci_t dcis[UCT_DC_IFACE_MAX_DCIS]; /* Array of dcis */ + uint8_t ndci; /* Number of DCIs */ + uct_dc_tx_policty_t policy; /* dci selection algorithm */ + + /* LIFO is only relevant for dcs allocation policy */ + uint8_t stack_top; /* dci stack top */ + uint8_t dcis_stack[UCT_DC_IFACE_MAX_DCIS]; /* LIFO of indexes of available dcis */ + + ucs_arbiter_t dci_arbiter; } tx; struct { - struct ibv_exp_dct *dct; + struct ibv_exp_dct *dct; } rx; } uct_dc_iface_t; @@ -71,6 +80,9 @@ ucs_status_t uct_dc_device_query_tl_resources(uct_ib_device_t *dev, uct_tl_resource_desc_t **resources_p, unsigned *num_resources_p); +ucs_status_t uct_dc_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp); + + /* TODO: * use a better seach algorithm (perfect hash, bsearch, hash) ??? * @@ -122,6 +134,4 @@ static inline ucs_status_t uct_dc_iface_flush_dci(uct_dc_iface_t *iface, int dci return UCS_INPROGRESS; } -ucs_status_t uct_dc_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp); - #endif diff --git a/src/uct/ib/dc/verbs/dc_verbs.c b/src/uct/ib/dc/verbs/dc_verbs.c index 04aa4811d96..72baae1814e 100644 --- a/src/uct/ib/dc/verbs/dc_verbs.c +++ b/src/uct/ib/dc/verbs/dc_verbs.c @@ -646,47 +646,53 @@ static UCS_CLASS_INIT_FUNC(uct_dc_verbs_iface_t, uct_md_h md, uct_worker_h worke const char *dev_name, size_t rx_headroom, const uct_iface_config_t *tl_config) { - int i; - ucs_status_t status; uct_dc_iface_config_t *config = ucs_derived_of(tl_config, uct_dc_iface_config_t); + struct ibv_qp_init_attr dci_init_attr; + struct ibv_qp_attr dci_attr; + ucs_status_t status; + int i, ret; + ucs_trace_func(""); UCS_CLASS_CALL_SUPER_INIT(uct_dc_iface_t, &uct_dc_verbs_iface_ops, md, worker, dev_name, rx_headroom, 0, config); - self->verbs_common.config.max_inline = config->max_inline; - - status = uct_rc_verbs_iface_common_init(&self->verbs_common, &self->super.super, &config->super); - if (status != UCS_OK) { - return status; - } - uct_dc_verbs_iface_init_wrs(self); - status = uct_rc_verbs_iface_prepost_recvs_common(&self->super.super); + status = uct_rc_verbs_iface_common_init(&self->verbs_common, &self->super.super, + &config->super); if (status != UCS_OK) { - goto out; + goto err; } - self->dcis_txcnt = ucs_malloc(self->super.tx.ndci * sizeof(uct_rc_verbs_txcnt_t), "dc"); - if (self->dcis_txcnt == NULL) { - status = UCS_ERR_NO_MEMORY; - goto out; + ret = ibv_query_qp(self->super.tx.dcis[0].txqp.qp, &dci_attr, 0, + &dci_init_attr); + if (ret) { + ucs_error("ibv_query_qp() failed: %m"); + goto err_common_cleanup; } + self->verbs_common.config.max_inline = dci_init_attr.cap.max_inline_data; + for (i = 0; i < self->super.tx.ndci; i++) { uct_rc_verbs_txcnt_init(&self->dcis_txcnt[i]); - uct_rc_txqp_available_set(&self->super.tx.dcis[i].txqp, self->super.super.config.tx_qp_len); + uct_rc_txqp_available_set(&self->super.tx.dcis[i].txqp, + self->super.super.config.tx_qp_len); } - - /** - * TODO: only register progress when we have a connection - */ + + status = uct_rc_verbs_iface_prepost_recvs_common(&self->super.super); + if (status != UCS_OK) { + goto err_common_cleanup; + } + + /* TODO: only register progress when we have a connection */ uct_worker_progress_register(worker, uct_dc_verbs_iface_progress, self); - ucs_debug("created iface %p", self); + ucs_debug("created dc iface %p", self); return UCS_OK; -out: + +err_common_cleanup: uct_rc_verbs_iface_common_cleanup(&self->verbs_common); +err: return status; } @@ -695,7 +701,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_verbs_iface_t) ucs_trace_func(""); uct_worker_progress_unregister(self->super.super.super.super.worker, uct_dc_verbs_iface_progress, self); - ucs_free(self->dcis_txcnt); uct_rc_verbs_iface_common_cleanup(&self->verbs_common); } diff --git a/src/uct/ib/dc/verbs/dc_verbs.h b/src/uct/ib/dc/verbs/dc_verbs.h index ea32289629b..e385595b4d6 100644 --- a/src/uct/ib/dc/verbs/dc_verbs.h +++ b/src/uct/ib/dc/verbs/dc_verbs.h @@ -11,18 +11,21 @@ #include #include + typedef struct uct_dc_verbs_iface { - uct_dc_iface_t super; - struct ibv_exp_send_wr inl_am_wr; - struct ibv_exp_send_wr inl_rwrite_wr; - uct_rc_verbs_iface_common_t verbs_common; - uct_rc_verbs_txcnt_t *dcis_txcnt; + uct_dc_iface_t super; + struct ibv_exp_send_wr inl_am_wr; + struct ibv_exp_send_wr inl_rwrite_wr; + uct_rc_verbs_iface_common_t verbs_common; + uct_rc_verbs_txcnt_t dcis_txcnt[UCT_DC_IFACE_MAX_DCIS]; } uct_dc_verbs_iface_t; + typedef struct uct_dc_verbs_ep { - uct_dc_ep_t super; - uint32_t dest_qpn; - struct ibv_ah *ah; + uct_dc_ep_t super; + uint32_t dest_qpn; + struct ibv_ah *ah; } uct_dc_verbs_ep_t; + #endif diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index 35ae64f08e7..358cacd7eb4 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -47,6 +47,9 @@ uct_rc_mlx5_iface_poll_tx(uct_rc_mlx5_iface_t *iface) ucs_memory_cpu_load_fence(); + ucs_assertv(!(cqe->op_own & (MLX5_INLINE_SCATTER_32|MLX5_INLINE_SCATTER_64)), + "tx inline scatter not supported"); + qp_num = ntohl(cqe->sop_drop_qpn) & UCS_MASK(UCT_IB_QPN_ORDER); ep = ucs_derived_of(uct_rc_iface_lookup_ep(&iface->super, qp_num), uct_rc_mlx5_ep_t); ucs_assert(ep != NULL); diff --git a/src/uct/ib/rc/base/rc_ep.c b/src/uct/ib/rc/base/rc_ep.c index a87d762c2de..3fb9053f7e8 100644 --- a/src/uct/ib/rc/base/rc_ep.c +++ b/src/uct/ib/rc/base/rc_ep.c @@ -56,17 +56,21 @@ ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface, status = uct_rc_iface_qp_create(iface, qp_type, &txqp->qp, cap); if (status != UCS_OK) { - return status; + goto err; } status = UCS_STATS_NODE_ALLOC(&txqp->stats, &uct_rc_txqp_stats_class, stats_parent, "-0x%x", txqp->qp->qp_num); if (status != UCS_OK) { - ibv_destroy_qp(txqp->qp); - return status; + goto err_destroy_qp; } return UCS_OK; + +err_destroy_qp: + ibv_destroy_qp(txqp->qp); +err: + return status; } void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp)