Skip to content

Commit

Permalink
Merge pull request #997 from yosefe/topic/dc-code-cleanup
Browse files Browse the repository at this point in the history
UCT/DC: Use inline arrays for DCIs.
  • Loading branch information
yosefe authored Sep 21, 2016
2 parents 8516932 + 869018c commit 69545a1
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 136 deletions.
69 changes: 39 additions & 30 deletions src/uct/ib/dc/accel/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface)

ucs_memory_cpu_load_fence();

ucs_assertv(!(cqe->op_own & (MLX5_INLINE_SCATTER_32|MLX5_INLINE_SCATTER_64)),
"tx inline scatter not supported");

qp_num = ntohl(cqe->sop_drop_qpn) & UCS_MASK(UCT_IB_QPN_ORDER);
dci = uct_dc_iface_dci_find(&iface->super, qp_num);
txqp = &iface->super.tx.dcis[dci].txqp;
Expand Down Expand Up @@ -571,54 +574,61 @@ static uct_rc_iface_ops_t uct_dc_mlx5_iface_ops = {
};


static ucs_status_t uct_dc_mlx5_iface_init_dcis(uct_dc_mlx5_iface_t *iface)
{
ucs_status_t status;
uint16_t bb_max;
int i;

bb_max = 0;
for (i = 0; i < iface->super.tx.ndci; i++) {
status = uct_ib_mlx5_get_txwq(iface->super.super.super.super.worker,
iface->super.tx.dcis[i].txqp.qp,
&iface->dci_wqs[i]);
if (status != UCS_OK) {
return status;
}


bb_max = iface->dci_wqs[i].bb_max;
uct_rc_txqp_available_set(&iface->super.tx.dcis[i].txqp, bb_max);
}

iface->super.super.config.tx_qp_len = bb_max;
return UCS_OK;
}

static UCS_CLASS_INIT_FUNC(uct_dc_mlx5_iface_t, uct_md_h md, uct_worker_h worker,
const char *dev_name, size_t rx_headroom,
const uct_iface_config_t *tl_config)
{
ucs_status_t status;
uct_dc_iface_config_t *config = ucs_derived_of(tl_config,
uct_dc_iface_config_t);
int i;
ucs_status_t status;

ucs_trace_func("");
UCS_CLASS_CALL_SUPER_INIT(uct_dc_iface_t, &uct_dc_mlx5_iface_ops, md,
worker, dev_name, rx_headroom, 0, config);


status = uct_rc_mlx5_iface_common_init(&self->mlx5_common, &self->super.super,
&config->super.super);
if (status != UCS_OK) {
return status;
goto err;
}

self->dci_wqs = ucs_malloc(self->super.tx.ndci * sizeof(uct_ib_mlx5_txwq_t), "dc");
if (self->dci_wqs == NULL) {
uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common);
return UCS_ERR_NO_MEMORY;
status = uct_dc_mlx5_iface_init_dcis(self);
if (status != UCS_OK) {
goto err_common_cleanup;
}

for (i = 0; i < self->super.tx.ndci; i++) {
status = uct_ib_mlx5_get_txwq(self->super.super.super.super.worker,
self->super.tx.dcis[i].txqp.qp,
&self->dci_wqs[i]);
if (status != UCS_OK) {
uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common);
ucs_free(self->dci_wqs);
return status;
}
uct_rc_txqp_available_set(&self->super.tx.dcis[i].txqp,
self->dci_wqs[i].bb_max);
}
/* convert tx_qp_len to wq bulding blocks */
self->super.super.config.tx_qp_len = self->dci_wqs[0].bb_max;
/* disable tx moderation for now */
self->super.super.config.tx_moderation = 0;

/**
* TODO: only register progress when we have a connection
*/
/* TODO: only register progress when we have a connection */
uct_worker_progress_register(worker, uct_dc_mlx5_iface_progress, self);
ucs_debug("created iface %p", self);
ucs_debug("created dc iface %p", self);
return UCS_OK;

err_common_cleanup:
uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common);
err:
return status;
}

Expand All @@ -628,7 +638,6 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_iface_t)
uct_worker_progress_unregister(self->super.super.super.super.worker,
uct_dc_mlx5_iface_progress, self);
uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common);
ucs_free(self->dci_wqs);
}

UCS_CLASS_DEFINE(uct_dc_mlx5_iface_t, uct_dc_iface_t);
Expand Down
13 changes: 8 additions & 5 deletions src/uct/ib/dc/accel/dc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
#include <uct/ib/dc/base/dc_ep.h>
#include <uct/ib/rc/accel/rc_mlx5_common.h>


typedef struct uct_dc_mlx5_iface {
uct_dc_iface_t super;
uct_rc_mlx5_iface_common_t mlx5_common;
uct_ib_mlx5_txwq_t *dci_wqs;
uct_dc_iface_t super;
uct_rc_mlx5_iface_common_t mlx5_common;
uct_ib_mlx5_txwq_t dci_wqs[UCT_DC_IFACE_MAX_DCIS];
} uct_dc_mlx5_iface_t;


typedef struct uct_dc_mlx5_ep {
uct_dc_ep_t super;
struct mlx5_wqe_av av;
uct_dc_ep_t super;
struct mlx5_wqe_av av;
} uct_dc_mlx5_ep_t;


#endif
15 changes: 10 additions & 5 deletions src/uct/ib/dc/base/dc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ ucs_status_t uct_dc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *r)
* - dci is either assigned or can be assigned
* - dci has resources
*/
if (uct_rc_iface_has_tx_resources(&iface->super) &&
((ep->dci != UCT_DC_EP_NO_DCI) || uct_dc_iface_dci_can_alloc(iface)) &&
uct_dc_iface_dci_ep_can_send(ep))
{
return UCS_ERR_BUSY;
if (uct_rc_iface_has_tx_resources(&iface->super)) {
if (ep->dci == UCT_DC_EP_NO_DCI) {
if (uct_dc_iface_dci_can_alloc(iface)) {
return UCS_ERR_BUSY;
}
} else {
if (uct_dc_iface_dci_ep_can_send(ep)) {
return UCS_ERR_BUSY;
}
}
}

UCS_STATIC_ASSERT(sizeof(ucs_arbiter_elem_t) <= UCT_PENDING_REQ_PRIV_LEN);
Expand Down
97 changes: 52 additions & 45 deletions src/uct/ib/dc/base/dc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const static char *uct_dc_tx_policy_names[] = {
[UCT_DC_TX_POLICY_LAST] = NULL
};

static ucs_status_t uct_dc_iface_tgt_create(uct_dc_iface_t *iface)
static ucs_status_t uct_dc_iface_create_dct(uct_dc_iface_t *iface)
{
struct ibv_exp_dct_init_attr init_attr;

Expand Down Expand Up @@ -100,53 +100,46 @@ static ucs_status_t uct_dc_iface_dci_connect(uct_dc_iface_t *iface, uct_rc_txqp_
return UCS_OK;
}

static ucs_status_t uct_dc_iface_dcis_create(uct_dc_iface_t *iface, uct_dc_iface_config_t *config)
static void uct_dc_iface_dcis_destroy(uct_dc_iface_t *iface, int max)
{
ucs_status_t status;
int i;
for (i = 0; i < max; i++) {
uct_rc_txqp_cleanup(&iface->tx.dcis[i].txqp);
}
}

static ucs_status_t uct_dc_iface_create_dcis(uct_dc_iface_t *iface,
uct_dc_iface_config_t *config)
{
struct ibv_qp_cap cap;
ucs_status_t status;
int i;

ucs_debug("creating %d dci(s)", iface->tx.ndci);
iface->tx.dcis = ucs_malloc(iface->tx.ndci * sizeof(uct_dc_dci_t), "dc");
if (iface->tx.dcis == NULL) {
return UCS_ERR_NO_MEMORY;
}

iface->tx.dcis_stack = ucs_malloc(iface->tx.ndci * sizeof(uint8_t), "dc");
if (iface->tx.dcis_stack == NULL) {
free(iface->tx.dcis);
return UCS_ERR_NO_MEMORY;
}

iface->tx.stack_top = 0;

for (i = 0; i < iface->tx.ndci; i++) {
status = uct_rc_txqp_init(&iface->tx.dcis[i].txqp, &iface->super,
IBV_EXP_QPT_DC_INI, &cap
UCS_STATS_ARG(iface->super.stats));
if (status != UCS_OK) {
goto create_err;
goto err;
}

status = uct_dc_iface_dci_connect(iface, &iface->tx.dcis[i].txqp);
if (status != UCS_OK) {
goto create_err;
uct_rc_txqp_cleanup(&iface->tx.dcis[i].txqp);
goto err;
}

iface->tx.dcis_stack[i] = i;
iface->tx.dcis[i].ep = NULL;
}
config->max_inline = cap.max_inline_data;

return UCS_OK;

create_err:
for (;i >= 0; i--) {
if (iface->tx.dcis[i].txqp.qp) {
ibv_destroy_qp(iface->tx.dcis[i].txqp.qp);
}
}
ucs_free(iface->tx.dcis);
ucs_free(iface->tx.dcis_stack);
err:
uct_dc_iface_dcis_destroy(iface, i);
return status;
}

Expand All @@ -160,38 +153,51 @@ UCS_CLASS_INIT_FUNC(uct_dc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md,
UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t, ops, md, worker, dev_name, rx_headroom,
rx_priv_len, &config->super.super);

self->tx.ndci = config->ndci;
self->tx.policy = config->tx_policy;
ucs_debug("using %s dci selection algorithm",
uct_dc_tx_policy_names[self->tx.policy]);
if (config->ndci < 1) {
ucs_error("dc interface must have at least 1 dci (requested: %d)",
config->ndci);
return UCS_ERR_INVALID_PARAM;
}

if (config->ndci > UCT_DC_IFACE_MAX_DCIS) {
ucs_error("dc interface can have at most %d dcis (requested: %d)",
UCT_DC_IFACE_MAX_DCIS, config->ndci);
return UCS_ERR_INVALID_PARAM;
}

self->tx.ndci = config->ndci;
self->tx.policy = config->tx_policy;
self->super.config.tx_moderation = 0; /* disable tx moderation for dcs */

ucs_debug("dc iface %p: using '%s' policy with %d dcis", self,
uct_dc_tx_policy_names[self->tx.policy], self->tx.ndci);

/* create DC target */
status = uct_dc_iface_tgt_create(self);
status = uct_dc_iface_create_dct(self);
if (status != UCS_OK) {
return status;
goto err;
}

/* create DC initiators */
status = uct_dc_iface_dcis_create(self, config);
status = uct_dc_iface_create_dcis(self, config);
if (status != UCS_OK) {
ibv_exp_destroy_dct(self->rx.dct);
return status;
goto err_destroy_dct;
}

ucs_arbiter_init(&self->tx.dci_arbiter);
return UCS_OK;

err_destroy_dct:
ibv_exp_destroy_dct(self->rx.dct);
err:
return status;
}

static UCS_CLASS_CLEANUP_FUNC(uct_dc_iface_t)
{
int i;

ucs_trace_func("");
ibv_exp_destroy_dct(self->rx.dct);

for (i = 0; i < self->tx.ndci; i++) {
uct_rc_txqp_cleanup(&self->tx.dcis[i].txqp);
}
ucs_free(self->tx.dcis);
ucs_free(self->tx.dcis_stack);
uct_dc_iface_dcis_destroy(self, self->tx.ndci);
ucs_arbiter_cleanup(&self->tx.dci_arbiter);
}

Expand All @@ -201,15 +207,16 @@ UCS_CLASS_DEFINE(uct_dc_iface_t, uct_rc_iface_t);
ucs_config_field_t uct_dc_iface_config_table[] = {
{"DC_", "", NULL,
ucs_offsetof(uct_dc_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_rc_verbs_iface_config_table)},

{"NUM_DCI", "8",
"number of QPs (dynamic connection initiators) allocated by the interface",
"Number of DC initiator QPs used by the interface (up to " UCS_PP_QUOTE(UCT_DC_IFACE_MAX_DCIS) ")",
ucs_offsetof(uct_dc_iface_config_t, ndci), UCS_CONFIG_TYPE_UINT},

{"TX_POLICY", "dcs_quota",
"Specifies how dci is selected by the endpoint. The policies are:\n"
"Specifies how DC initiator (dci) is selected by the endpoint. The policies are:\n"
"\n"
"dcs the endpoint either uses already assigned dci or a dci is allocated in the LIFO order.\n"
" The dci is released once it has no outstanding operations.\n"
" The dci is released once it has no outstanding operations.\n"
"\n"
"dcs_quota same as dcs. In addition the dci is scheduled for release\n"
" if it can not transmit and there are endpoints waiting for the dci allocation.\n"
Expand Down
Loading

0 comments on commit 69545a1

Please sign in to comment.