Skip to content

Commit

Permalink
Merge branch 'integration3' of https://github.com/leibin2014/ucx into…
Browse files Browse the repository at this point in the history
… integration3
  • Loading branch information
binl committed Apr 23, 2021
2 parents 3b5af27 + 9c4b6f5 commit 24b2bb9
Show file tree
Hide file tree
Showing 22 changed files with 346 additions and 173 deletions.
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_proxy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep)
ucp_ep_h ucp_ep = proxy_ep->ucp_ep;
ucp_lane_index_t lane;
uct_ep_h tl_ep = NULL;
ucs_status_t status;

ucs_assert(proxy_ep->uct_ep != NULL);
for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
Expand All @@ -224,6 +225,11 @@ void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep)
ucp_ep->uct_eps[lane] = proxy_ep->uct_ep;
tl_ep = ucp_ep->uct_eps[lane];
proxy_ep->uct_ep = NULL;
status = uct_ep_enable_keep_alive(tl_ep, 1);
if (status != UCS_OK) {
ucs_diag("ep %p: uct_ep_enable_keep_alive(tl_ep=%p, 1) failed %s",
ucp_ep, tl_ep, ucs_status_string(status));
}
}
}

Expand Down
18 changes: 6 additions & 12 deletions src/ucp/proto/proto_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,11 @@ void ucp_proto_am_zcopy_completion(uct_completion_t *self,
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t,
send.state.uct_comp);
if (req->send.state.dt.offset == req->send.length) {
ucp_proto_am_zcopy_req_complete(req, status);
} else if (status != UCS_OK) {
ucs_assert(req->send.state.uct_comp.count == 0);
ucs_assert(status != UCS_INPROGRESS);

/* NOTE: the request is in pending queue if data was not completely sent,
* just dereg the buffer here and complete request on purge
* pending later.
*/
ucp_request_send_buffer_dereg(req);
req->send.state.uct_comp.func = NULL;

if (req->send.state.dt.offset != req->send.length) {
/* Cannot complete since not all fragments were posted yet */
return;
}

ucp_proto_am_zcopy_req_complete(req, status);
}
4 changes: 2 additions & 2 deletions src/ucp/stream/stream_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_send_nb,

UCP_CONTEXT_CHECK_FEATURE_FLAGS(ep->worker->context, UCP_FEATURE_STREAM,
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
UCS_ASYNC_BLOCK(&ep->worker->async);

ucs_trace_req("stream_send_nb buffer %p count %zu to %s cb %p flags %u",
buffer, count, ucp_ep_peer_name(ep), cb, flags);
Expand Down Expand Up @@ -135,7 +135,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_send_nb,
ucp_ep_config(ep)->stream.proto);

out:
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
UCS_ASYNC_UNBLOCK(&ep->worker->async);
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions src/ucp/wireup/wireup_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ UCS_CLASS_INIT_FUNC(ucp_wireup_ep_t, ucp_ep_h ucp_ep)
{
static uct_iface_ops_t ops = {
.ep_connect_to_ep = ucp_wireup_ep_connect_to_ep,
.ep_enable_keep_alive= (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_unsupported,
.ep_flush = ucp_wireup_ep_flush,
.ep_query = (uct_ep_query_func_t)ucs_empty_function_return_not_connected,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(ucp_wireup_ep_t),
Expand Down
3 changes: 3 additions & 0 deletions src/uct/api/tl.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ typedef ucs_status_t (*uct_ep_connect_to_ep_func_t)(uct_ep_h ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr);

typedef ucs_status_t (*uct_ep_enable_keep_alive_func_t)(uct_ep_h ep, int enable);

typedef ucs_status_t (*uct_iface_accept_func_t)(uct_iface_h iface,
uct_conn_request_h conn_request);

Expand Down Expand Up @@ -341,6 +343,7 @@ typedef struct uct_iface_ops {
uct_ep_destroy_func_t ep_destroy;
uct_ep_get_address_func_t ep_get_address;
uct_ep_connect_to_ep_func_t ep_connect_to_ep;
uct_ep_enable_keep_alive_func_t ep_enable_keep_alive;
uct_iface_accept_func_t iface_accept;
uct_iface_reject_func_t iface_reject;

Expand Down
11 changes: 11 additions & 0 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,17 @@ ucs_status_t uct_ep_connect_to_ep(uct_ep_h ep, const uct_device_addr_t *dev_addr
const uct_ep_addr_t *ep_addr);


/**
* @ingroup UCT_RESOURCE
* @brief enable/disable keep alive protocol on the endpoint.
*
* @param [in] ep Endpoint to enable keep alive on.
* @param [in] enable 1 - enable, 0 - disable keep alive
* @return UCS_OK In case of success
* UCS_ERR_UNSUPPORTED If transport does not support keep alive
*/
ucs_status_t uct_ep_enable_keep_alive(uct_ep_h ep, int enable);

/**
* @ingroup UCT_MD
* @brief Query for memory domain attributes.
Expand Down
6 changes: 6 additions & 0 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ ucs_status_t uct_set_ep_failed(ucs_class_t *cls, uct_ep_h tl_ep,
ops->ep_fence = (uct_ep_fence_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_check = (uct_ep_check_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_connect_to_ep = (uct_ep_connect_to_ep_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_enable_keep_alive= (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_ep_timeout,
ops->ep_query = (uct_ep_query_func_t)ucs_empty_function_return_ep_timeout;
ops->ep_destroy = uct_ep_failed_destroy;
ops->ep_get_address = (uct_ep_get_address_func_t)ucs_empty_function_return_ep_timeout;
Expand Down Expand Up @@ -558,6 +559,11 @@ ucs_status_t uct_ep_connect_to_ep(uct_ep_h ep, const uct_device_addr_t *dev_addr
return ep->iface->ops.ep_connect_to_ep(ep, dev_addr, ep_addr);
}

ucs_status_t uct_ep_enable_keep_alive(uct_ep_h ep, int enable)
{
return ep->iface->ops.ep_enable_keep_alive(ep, enable);
}

ucs_status_t uct_cm_client_ep_conn_notify(uct_ep_h ep)
{
return ep->iface->ops.cm_ep_conn_notify(ep);
Expand Down
2 changes: 1 addition & 1 deletion src/uct/ib/base/ib_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ UCS_CLASS_INIT_FUNC(uct_ib_iface_t, uct_ib_iface_ops_t *ops, uct_md_h md,
goto err_destroy_comp_channel;
}

inl = config->rx.inl;
inl = config->tx.inl_resp;
status = uct_ib_iface_create_cq(self, init_attr->tx_cq_len, &inl,
preferred_cpu, init_attr->flags,
&self->cq[UCT_IB_DIR_TX]);
Expand Down
3 changes: 2 additions & 1 deletion src/uct/ib/mlx5/dv/ib_mlx5_dv.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ ucs_status_t uct_ib_mlx5_devx_create_qp(uct_ib_iface_t *iface,
UCT_IB_MLX5DV_SET(qpc, qpc, cqn_rcv, dvrcq.cqn);
UCT_IB_MLX5DV_SET(qpc, qpc, log_sq_size, ucs_ilog2_or0(max_tx));
UCT_IB_MLX5DV_SET(qpc, qpc, log_rq_size, ucs_ilog2_or0(max_rx));
UCT_IB_MLX5DV_SET(qpc, qpc, cs_req, UCT_IB_MLX5_QPC_CS_REQ_UP_TO_64B);
UCT_IB_MLX5DV_SET(qpc, qpc, cs_req,
uct_ib_mlx5_qpc_cs_req(attr->max_inl_recv));
UCT_IB_MLX5DV_SET(qpc, qpc, cs_res,
uct_ib_mlx5_qpc_cs_res(attr->max_inl_resp));
UCT_IB_MLX5DV_SET64(qpc, qpc, dbr_addr, qp->devx.dbrec->offset);
Expand Down
2 changes: 1 addition & 1 deletion src/uct/ib/mlx5/ib_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ ucs_status_t uct_ib_mlx5_create_cq(struct ibv_context *context, int cqe,
}

*cq_p = cq;
*inl = dv_attr.cqe_size / 2;
*inl = (*inl > 0) ? (dv_attr.cqe_size / 2) : 0;
return UCS_OK;
#else
return uct_ib_verbs_create_cq(context, cqe, channel, comp_vector,
Expand Down
2 changes: 2 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr);

ucs_status_t uct_rc_mlx5_ep_enable_keep_alive(uct_ep_h tl_ep, int enable);

unsigned uct_rc_mlx5_iface_progress(void *arg);

ucs_status_t uct_rc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag,
Expand Down
9 changes: 8 additions & 1 deletion src/uct/ib/rc/accel/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,18 @@ ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep,
}

ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id);
ep->connected = 1;

return UCS_OK;
}

ucs_status_t uct_rc_mlx5_ep_enable_keep_alive(uct_ep_h tl_ep, int enable)
{
uct_rc_mlx5_ep_t *rc_mlx5_ep = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);

rc_mlx5_ep->connected = enable;
return UCS_OK;
}

#if IBV_HW_TM

ucs_status_t uct_rc_mlx5_ep_tag_rndv_cancel(uct_ep_h tl_ep, void *op)
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/rc/accel/rc_mlx5_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ static uct_rc_iface_ops_t uct_rc_mlx5_iface_ops = {
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_rc_mlx5_ep_t),
.ep_get_address = uct_rc_mlx5_ep_get_address,
.ep_connect_to_ep = uct_rc_mlx5_ep_connect_to_ep,
.ep_enable_keep_alive = uct_rc_mlx5_ep_enable_keep_alive,
#if IBV_HW_TM
.ep_tag_eager_short = uct_rc_mlx5_ep_tag_eager_short,
.ep_tag_eager_bcopy = uct_rc_mlx5_ep_tag_eager_bcopy,
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/rc/verbs/rc_verbs_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ static uct_rc_iface_ops_t uct_rc_verbs_iface_ops = {
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_rc_verbs_ep_t),
.ep_get_address = uct_rc_verbs_ep_get_address,
.ep_connect_to_ep = uct_rc_verbs_ep_connect_to_ep,
.ep_enable_keep_alive = (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_unsupported,
.iface_flush = uct_rc_iface_flush,
.iface_fence = uct_rc_iface_fence,
.iface_progress_enable = uct_rc_verbs_iface_common_progress_enable,
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/rdmacm/rdmacm_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ static uct_cm_ops_t uct_rdmacm_cm_ops = {
static uct_iface_ops_t uct_rdmacm_cm_iface_ops = {
.ep_pending_purge = ucs_empty_function,
.ep_disconnect = uct_rdmacm_cm_ep_disconnect,
.ep_enable_keep_alive = (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_unsupported,
.cm_ep_conn_notify = uct_rdmacm_cm_ep_conn_notify,
.ep_query = uct_rdmacm_ep_query,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_rdmacm_cm_ep_t),
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/ud/accel/ud_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ static uct_ud_iface_ops_t uct_ud_mlx5_iface_ops = {
.ep_destroy = uct_ud_ep_disconnect ,
.ep_get_address = uct_ud_ep_get_address,
.ep_connect_to_ep = uct_ud_mlx5_ep_connect_to_ep,
.ep_enable_keep_alive = (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_unsupported,
.iface_flush = uct_ud_iface_flush,
.iface_fence = uct_base_iface_fence,
.iface_progress_enable = uct_ud_iface_progress_enable,
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/ud/verbs/ud_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ static uct_ud_iface_ops_t uct_ud_verbs_iface_ops = {
.ep_destroy = uct_ud_ep_disconnect,
.ep_get_address = uct_ud_ep_get_address,
.ep_connect_to_ep = uct_ud_verbs_ep_connect_to_ep,
.ep_enable_keep_alive = (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_unsupported,
.iface_flush = uct_ud_iface_flush,
.iface_fence = uct_base_iface_fence,
.iface_progress_enable = uct_ud_iface_progress_enable,
Expand Down
1 change: 1 addition & 0 deletions src/uct/tcp/tcp_sockcm.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ static uct_iface_ops_t uct_tcp_sockcm_iface_ops = {
.ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function,
.ep_disconnect = uct_tcp_sockcm_ep_disconnect,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_ep_t),
.ep_enable_keep_alive = (uct_ep_enable_keep_alive_func_t)ucs_empty_function_return_unsupported,
.ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_unsupported,
.ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_unsupported,
.ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_unsupported,
Expand Down
Loading

0 comments on commit 24b2bb9

Please sign in to comment.