Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/cxi: peer infrastructure support #10436

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 116 additions & 3 deletions prov/cxi/include/cxip.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
FI_REMOTE_COMM | FI_RMA_EVENT | FI_MULTI_RECV | FI_FENCE | FI_TRIGGER)
#define CXIP_EP_CAPS (CXIP_EP_PRI_CAPS | CXIP_EP_SEC_CAPS)
#define CXIP_DOM_CAPS (FI_LOCAL_COMM | FI_REMOTE_COMM | FI_AV_USER_ID)
#define CXIP_CAPS (CXIP_DOM_CAPS | CXIP_EP_CAPS)
#define CXIP_CAPS (CXIP_DOM_CAPS | CXIP_EP_CAPS | FI_PEER)
#define CXIP_MSG_ORDER (FI_ORDER_SAS | \
FI_ORDER_WAW | \
FI_ORDER_RMA_WAW | \
Expand Down Expand Up @@ -177,7 +177,7 @@
#define CXIP_MINOR_VERSION 1
#define CXIP_PROV_VERSION FI_VERSION(CXIP_MAJOR_VERSION, \
CXIP_MINOR_VERSION)
#define CXIP_FI_VERSION FI_VERSION(1, 21)
#define CXIP_FI_VERSION FI_VERSION(2, 0)
#define CXIP_WIRE_PROTO_VERSION 1

#define CXIP_COLL_MAX_CONCUR 8
Expand Down Expand Up @@ -853,6 +853,8 @@ struct cxip_domain {
ofi_spin_t lock;
ofi_atomic32_t ref;

struct fid_peer_srx *owner_srx;

uint32_t tclass;

struct cxip_eq *eq; //unused
Expand Down Expand Up @@ -1263,6 +1265,9 @@ struct cxip_req {
uint64_t trig_thresh;
struct cxip_cntr *trig_cntr;

/* pointer to the shared receive entry */
struct fi_peer_rx_entry *rx_entry;

/* CQ event fields, set according to fi_cq.3
* - set by provider
* - returned to user in completion event
Expand Down Expand Up @@ -1381,6 +1386,23 @@ struct cxip_evtq {
struct dlist_entry req_list;
};

/*
* Peer CQ callbacks.
* These callback definitions can be used by providers to define generic
* callbacks which can be assigned different functions to handle completion
* for an imported cq vs an internal cq
*/
struct cxip_peer_cq_cb {
int (*cq_comp)(struct util_cq *cq, void *context,
uint64_t flags, size_t len, void *buf, uint64_t data,
uint64_t tag);
int (*cq_comp_src)(struct util_cq *cq, void *context,
uint64_t flags, size_t len, void *buf, uint64_t data,
uint64_t tag, fi_addr_t addr);
int (*cq_err)(struct util_cq *cq,
const struct fi_cq_err_entry *err_entry);
};

/*
* CXI Libfbric software completion queue
*/
Expand All @@ -1394,6 +1416,10 @@ struct cxip_cq {
*/
struct ofi_genlock ep_list_lock;

/* Peer CQ */
struct fid_peer_cq *peer_cq;
struct cxip_peer_cq_cb cq_cb;

/* Internal CXI wait object allocated only if required. */
struct cxil_wait_obj *priv_wait;

Expand Down Expand Up @@ -1436,6 +1462,8 @@ struct cxip_cntr {
struct cxip_ux_send {
struct dlist_entry rxc_entry;
struct cxip_req *req;
struct cxip_rxc *rxc;
struct fi_peer_rx_entry *rx_entry;
union c_event put_ev;
bool claimed; /* Reserved with FI_PEEK | FI_CLAIM */
};
Expand Down Expand Up @@ -3176,6 +3204,11 @@ double cxip_rep_sum(size_t count, double *values);
int cxip_check_auth_key_info(struct fi_info *info);
int cxip_gen_auth_key(struct fi_info *info, struct cxi_auth_key *key);

static inline struct fid_peer_srx *cxip_get_owner_srx(struct cxip_rxc *rxc)
{
return rxc->domain->owner_srx;
}

#define CXIP_FC_SOFTWARE_INITIATED -1

/* cxip_fc_reason() - Returns the event reason for portal state
Expand Down Expand Up @@ -3220,6 +3253,14 @@ ssize_t cxip_rma_common(enum fi_op_type op, struct cxip_txc *txc,
struct cxip_cntr *trig_cntr,
struct cxip_cntr *comp_cntr);

static inline int cxip_discard(struct fi_peer_rx_entry *rx_entry)
{
/* TODO: how do we discard a message properly? */
return -FI_ENOSYS;
}

int cxip_unexp_start(struct fi_peer_rx_entry *entry);

/*
* Request variants:
* CXIP_RQ_AMO
Expand Down Expand Up @@ -3631,7 +3672,9 @@ int cxip_set_recv_match_id(struct cxip_rxc *rxc, fi_addr_t src_addr,
return FI_SUCCESS;
}

fi_addr_t cxip_recv_req_src_addr(struct cxip_req *req);
fi_addr_t cxip_recv_req_src_addr(struct cxip_rxc *rxc,
uint32_t init, uint16_t vni,
bool force);
int cxip_recv_req_alloc(struct cxip_rxc *rxc, void *buf, size_t len,
struct cxip_md *md, struct cxip_req **cxip_req,
int (*recv_cb)(struct cxip_req *req,
Expand Down Expand Up @@ -3683,4 +3726,74 @@ int cxip_domain_dwq_emit_amo(struct cxip_domain *dom, uint16_t vni,
struct c_dma_amo_cmd *amo, uint64_t flags,
bool fetching, bool flush);

static inline void cxip_set_env_rx_match_mode(void)
{
char *param_str = NULL;

fi_param_get_str(&cxip_prov, "rx_match_mode", &param_str);
/* Parameters to tailor hybrid hardware to software transitions
* that are initiated by software.
*/
fi_param_define(&cxip_prov, "hybrid_preemptive", FI_PARAM_BOOL,
"Enable/Disable low LE preemptive UX transitions.");
fi_param_get_bool(&cxip_prov, "hybrid_preemptive",
&cxip_env.hybrid_preemptive);
fi_param_define(&cxip_prov, "hybrid_recv_preemptive", FI_PARAM_BOOL,
"Enable/Disable low LE preemptive recv transitions.");
fi_param_get_bool(&cxip_prov, "hybrid_recv_preemptive",
&cxip_env.hybrid_recv_preemptive);
fi_param_define(&cxip_prov, "hybrid_unexpected_msg_preemptive",
FI_PARAM_BOOL,
"Enable preemptive transition to software endpoint when number of hardware unexpected messages exceeds RX attribute size");
fi_param_get_bool(&cxip_prov, "hybrid_unexpected_msg_preemptive",
&cxip_env.hybrid_unexpected_msg_preemptive);
fi_param_define(&cxip_prov, "hybrid_posted_recv_preemptive",
FI_PARAM_BOOL,
"Enable preemptive transition to software endpoint when number of posted receives exceeds RX attribute size");
fi_param_get_bool(&cxip_prov, "hybrid_posted_recv_preemptive",
&cxip_env.hybrid_posted_recv_preemptive);

if (param_str) {
if (!strcasecmp(param_str, "hardware")) {
cxip_env.rx_match_mode = CXIP_PTLTE_HARDWARE_MODE;
cxip_env.msg_offload = true;
} else if (!strcmp(param_str, "software")) {
cxip_env.rx_match_mode = CXIP_PTLTE_SOFTWARE_MODE;
cxip_env.msg_offload = false;
} else if (!strcmp(param_str, "hybrid")) {
cxip_env.rx_match_mode = CXIP_PTLTE_HYBRID_MODE;
cxip_env.msg_offload = true;
} else {
_CXIP_WARN(FI_LOG_FABRIC, "Unrecognized rx_match_mode: %s\n",
param_str);
cxip_env.rx_match_mode = CXIP_PTLTE_HARDWARE_MODE;
cxip_env.msg_offload = true;
}
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_preemptive) {
cxip_env.hybrid_preemptive = false;
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignoring preemptive\n");
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_recv_preemptive) {
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignore LE recv preemptive\n");
cxip_env.hybrid_recv_preemptive = 0;
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_posted_recv_preemptive) {
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignore hybrid_posted_recv_preemptive\n");
cxip_env.hybrid_posted_recv_preemptive = 0;
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_unexpected_msg_preemptive) {
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignore hybrid_unexpected_msg_preemptive\n");
cxip_env.hybrid_unexpected_msg_preemptive = 0;
}
}

#endif
17 changes: 17 additions & 0 deletions prov/cxi/src/cxip_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,26 @@ struct cxip_addr *(*cxip_av_addr_in)(const void *addr) = insert_in;
void (*cxip_av_addr_out)(struct cxip_addr *addr_out,
struct cxip_addr *addr) = insert_out;

static fi_addr_t cxip_get_addr(struct fi_peer_rx_entry *entry)
{
uint32_t ux_init;
uint16_t vni;
struct cxip_ux_send *ux = entry->peer_context;

ux_init = ux->put_ev.tgt_long.initiator.initiator.process;
vni = ux->put_ev.tgt_long.vni;

return cxip_recv_req_src_addr(ux->rxc, ux_init, vni, true);
}

static int cxip_av_insert(struct fid_av *fid, const void *addr_in, size_t count,
fi_addr_t *fi_addr, uint64_t flags, void *context)
{
struct cxip_av *av = container_of(fid, struct cxip_av, av_fid.fid);
size_t i;
size_t success_cnt = 0;
int ret;
struct fid_peer_srx *owner_srx;

ret = cxip_av_insert_validate_args(fid, addr_in, count, fi_addr, flags,
context);
Expand All @@ -253,6 +266,10 @@ static int cxip_av_insert(struct fid_av *fid, const void *addr_in, size_t count,

cxip_av_unlock(av);

owner_srx = av->domain->owner_srx;
if (owner_srx)
owner_srx->owner_ops->foreach_unspec_addr(owner_srx, &cxip_get_addr);

return success_cnt;
}

Expand Down
83 changes: 77 additions & 6 deletions prov/cxi/src/cxip_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,46 @@
#define CXIP_DBG(...) _CXIP_DBG(FI_LOG_CQ, __VA_ARGS__)
#define CXIP_WARN(...) _CXIP_WARN(FI_LOG_CQ, __VA_ARGS__)

static int cxip_peer_cq_comp(struct util_cq *cq, void *context,
uint64_t flags, size_t len, void *buf, uint64_t data,
uint64_t tag)
{
struct cxip_cq *cxip_cq;
struct fid_peer_cq *peer_cq;

cxip_cq = container_of(cq, struct cxip_cq, util_cq);
peer_cq = cxip_cq->peer_cq;

return peer_cq->owner_ops->write(peer_cq, context, flags, len,
buf, data, tag, FI_ADDR_NOTAVAIL);
}

static int cxip_peer_cq_comp_src(struct util_cq *cq, void *context,
uint64_t flags, size_t len, void *buf, uint64_t data,
uint64_t tag, fi_addr_t addr)
{
struct cxip_cq *cxip_cq;
struct fid_peer_cq *peer_cq;

cxip_cq = container_of(cq, struct cxip_cq, util_cq);
peer_cq = cxip_cq->peer_cq;

return peer_cq->owner_ops->write(peer_cq, context, flags, len,
buf, data, tag, addr);
}

static int cxip_peer_cq_err(struct util_cq *cq,
const struct fi_cq_err_entry *err_entry)
{
struct cxip_cq *cxip_cq;
struct fid_peer_cq *peer_cq;

cxip_cq = container_of(cq, struct cxip_cq, util_cq);
peer_cq = cxip_cq->peer_cq;

return peer_cq->owner_ops->writeerr(peer_cq, err_entry);
}

/*
* cxip_cq_req_complete() - Generate a completion event for the request.
*/
Expand All @@ -34,9 +74,9 @@ int cxip_cq_req_complete(struct cxip_req *req)
return FI_SUCCESS;
}

return ofi_cq_write(&req->cq->util_cq, (void *)req->context,
req->flags, req->data_len, (void *)req->buf,
req->data, req->tag);
return req->cq->cq_cb.cq_comp(&req->cq->util_cq, (void *)req->context,
req->flags, req->data_len, (void *)req->buf,
req->data, req->tag);
}

/*
Expand All @@ -50,7 +90,7 @@ int cxip_cq_req_complete_addr(struct cxip_req *req, fi_addr_t src)
return FI_SUCCESS;
}

return ofi_cq_write_src(&req->cq->util_cq, (void *)req->context,
return req->cq->cq_cb.cq_comp_src(&req->cq->util_cq, (void *)req->context,
req->flags, req->data_len, (void *)req->buf,
req->data, req->tag, src);
}
Expand Down Expand Up @@ -94,7 +134,7 @@ int cxip_cq_req_error(struct cxip_req *req, size_t olen,
err_entry.buf = (void *)(uintptr_t)req->buf;
err_entry.src_addr = src_addr;

return ofi_cq_write_error(&req->cq->util_cq, &err_entry);
return req->cq->cq_cb.cq_err(&req->cq->util_cq, &err_entry);
}

/*
Expand Down Expand Up @@ -316,6 +356,20 @@ static int cxip_cq_verify_attr(struct fi_cq_attr *attr)
return FI_SUCCESS;
}

ssize_t cxip_peer_cq_progress(struct fid_cq *cq, void *buf, size_t count)
{
struct util_cq *util_cq;

if (buf || count > 0)
return -FI_EINVAL;

util_cq = container_of(cq, struct util_cq, cq_fid);

cxip_util_cq_progress(util_cq);

return 0;
}

/*
* cxip_cq_alloc_priv_wait - Allocate an internal wait channel for the CQ.
*/
Expand Down Expand Up @@ -400,7 +454,24 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
goto err_util_cq;
}

cxi_cq->util_cq.cq_fid.ops->strerror = &cxip_cq_strerror;
if (attr->flags & FI_PEER) {
struct fi_peer_cq_context *cq_cntxt = context;

if (!cq_cntxt)
return -FI_EINVAL;

cxi_cq->peer_cq = cq_cntxt->cq;

cxi_cq->cq_cb.cq_comp = cxip_peer_cq_comp;
cxi_cq->cq_cb.cq_comp_src = cxip_peer_cq_comp_src;
cxi_cq->cq_cb.cq_err = cxip_peer_cq_err;
} else {
cxi_cq->cq_cb.cq_comp = ofi_cq_write;
cxi_cq->cq_cb.cq_comp_src = ofi_cq_write_src;
cxi_cq->cq_cb.cq_err = ofi_cq_write_error;
cxi_cq->util_cq.cq_fid.ops->strerror = &cxip_cq_strerror;
}

cxi_cq->util_cq.cq_fid.fid.ops = &cxip_cq_fi_ops;

cxi_cq->domain = cxi_dom;
Expand Down
Loading