Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP: tm offload rndv support #1549

Merged
merged 5 commits into from
Jun 1, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ struct ucp_request {
uint64_t value;
void *result;
} amo;
struct {
void *rndv_op; /* Handler of issued rndv send. Needs to cancel the
operation if it is completed by SW */
} tag_offload;

};

ucp_lane_index_t lane; /* Lane on which this request is being sent */
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker,
iface_params.stats_root = UCS_STATS_RVAL(worker->stats);
iface_params.rx_headroom = rx_headroom;
iface_params.cpu_mask = *cpu_mask_param;
iface_params.eager_arg = worker;
iface_params.eager_arg = iface_params.rndv_arg = worker;
iface_params.eager_cb = ucp_tag_offload_unexp_eager;
iface_params.rndv_cb = ucp_tag_offload_unexp_rndv;

/* Open UCT interface */
status = uct_iface_open(context->tl_mds[resource->md_index].md, worker->uct,
Expand Down
4 changes: 3 additions & 1 deletion src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ static inline ucp_ep_h ucp_worker_ep_find(ucp_worker_h worker, uint64_t dest_uui
static UCS_F_ALWAYS_INLINE
uint64_t ucp_worker_is_tl_tag_offload(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return 0; /* Stub for now, offload TM RNDV is not implemented yet */
return (worker->ifaces[rsc_index].attr.cap.flags &
(UCT_IFACE_FLAG_TAG_EAGER_SHORT | UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
UCT_IFACE_FLAG_TAG_EAGER_ZCOPY | UCT_IFACE_FLAG_TAG_RNDV_ZCOPY));
}

#endif
2 changes: 1 addition & 1 deletion src/ucp/tag/eager_snd.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ static ucs_status_t ucp_tag_eager_zcopy_multi(uct_pending_req_t *self)
}

void ucp_tag_eager_zcopy_completion(uct_completion_t *self,
ucs_status_t status)
ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp);
ucp_tag_eager_zcopy_req_complete(req);
Expand Down
136 changes: 136 additions & 0 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,66 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag,
ucp_request_complete_recv(req, status);
}

/* RNDV request matched by the transport. Need to proceed with AM based RNDV */
void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag,
const void *header, unsigned header_length,
ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx);
ucp_context_t *ctx = req->recv.worker->context;
ucp_sw_rndv_hdr_t *sreq = (ucp_sw_rndv_hdr_t*)header;
ucp_worker_iface_t *iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces,
ucp_worker_iface_t, queue);
ucp_rndv_rts_hdr_t rts;

rts.sreq = sreq->super;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add comment like "emulate rts without key"

rts.super.tag = stag;
rts.flags = 0;
rts.address = 0; /* RNDV needs to be completed in SW */
rts.size = sreq->length;

ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype,
&req->recv.state);
/* coverity[address_of] */
ucp_rndv_matched(req->recv.worker, req, &rts);
}

ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags,
uint64_t stag, const void *hdr,
unsigned hdr_length,
uint64_t remote_addr, size_t length,
const void *rkey_buf)
{
ucp_rndv_rts_hdr_t *rts = (ucp_rndv_rts_hdr_t*)(((ucp_tag_hdr_t*)hdr) - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this illegal to access memory before 'hdr'?

ucp_worker_t *worker = arg;
void *rkey = rts + 1;
size_t len = sizeof(*rts);
ucp_ep_t *ep = ucp_worker_get_reply_ep(worker, rts->sreq.sender_uuid);
const uct_md_attr_t *md_attrs;
size_t rkey_size;

/* rts.req should be alredy in place - it is sent by the sender.
* Fill the rest of rts header and pass to common rts handler */
if (rkey_buf) {
/* Copy rkey before to fill rts, to avoid overriding rkey */
md_attrs = ucp_ep_md_attr(ep, ucp_ep_get_tag_lane(ep));
rkey_size = md_attrs->rkey_packed_size;
memcpy(rkey, rkey_buf, rkey_size);
len += rkey_size;
rts->flags = UCP_RNDV_RTS_FLAG_PACKED_RKEY | UCP_RNDV_RTS_FLAG_OFFLOAD;
rts->size = length;
} else {
ucs_assert(remote_addr == 0ul);
/* This must be SW RNDV request. Take length from its header. */
rts->size = ((ucp_sw_rndv_hdr_t*)hdr)->length;
}

rts->super.tag = stag;
rts->address = remote_addr;

return ucp_rndv_rts_handler(arg, rts, len, flags, UCP_RECV_DESC_FLAG_OFFLOAD);
}

void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force)
{
ucp_worker_iface_t *ucp_iface;
Expand Down Expand Up @@ -99,6 +159,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req)

req->recv.uct_ctx.tag_consumed_cb = ucp_tag_offload_tag_consumed;
req->recv.uct_ctx.completed_cb = ucp_tag_offload_completed;
req->recv.uct_ctx.rndv_cb = ucp_tag_offload_rndv_cb;

iov.buffer = (void*)req->recv.buffer;
iov.length = length;
Expand Down Expand Up @@ -213,6 +274,81 @@ static ucs_status_t ucp_tag_offload_eager_zcopy(uct_pending_req_t *self)
return ucp_do_tag_offload_zcopy(self, 0ul, ucp_tag_eager_zcopy_req_complete);
}

ucs_status_t ucp_tag_offload_sw_rndv(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_t *ep = req->send.ep;
ucp_sw_rndv_hdr_t rndv_hdr = {
.super.sender_uuid = req->send.ep->worker->uuid,
.super.reqptr = (uintptr_t)req,
.length = req->send.length
};

return uct_ep_tag_rndv_request(ep->uct_eps[req->send.lane], req->send.tag,
&rndv_hdr, sizeof(rndv_hdr));
}

ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self)
{
void *rndv_op;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls move this variable to be last, so variables with initializers are first

ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_t *ep = req->send.ep;
size_t max_iov = ucp_ep_config(ep)->tag.eager.max_iov;
uct_iov_t *iov = ucs_alloca(max_iov * sizeof(uct_iov_t));
size_t iovcnt = 0;
ucp_request_hdr_t rndv_hdr = {
.sender_uuid = ep->worker->uuid,
.reqptr = (uintptr_t)req
};

req->send.uct_comp.count = 1;
req->send.uct_comp.func = ucp_tag_eager_zcopy_completion;

ucs_assert_always(UCP_DT_IS_CONTIG(req->send.datatype));
ucp_dt_iov_copy_uct(iov, &iovcnt, max_iov, &req->send.state, req->send.buffer,
req->send.datatype, req->send.length);

rndv_op = uct_ep_tag_rndv_zcopy(ep->uct_eps[req->send.lane], req->send.tag,
&rndv_hdr, sizeof(rndv_hdr), iov, iovcnt,
&req->send.uct_comp);
if (UCS_PTR_IS_ERR(rndv_op)) {
return UCS_PTR_STATUS(rndv_op);
}
req->flags |= UCP_REQUEST_FLAG_OFFLOADED;
req->send.tag_offload.rndv_op = rndv_op;
return UCS_OK;
}

void ucp_tag_offload_cancel_rndv(ucp_request_t *req)
{
ucp_ep_t *ep = req->send.ep;
ucs_status_t status;

status = uct_ep_tag_rndv_cancel(ep->uct_eps[ucp_ep_get_tag_lane(ep)],
req->send.tag_offload.rndv_op);
if (status != UCS_OK) {
ucs_error("Failed to cancel tag rndv op %s", ucs_status_string(status));
}
}

ucs_status_t ucp_tag_offload_start_rndv(ucp_request_t *sreq)
{
ucs_status_t status;
ucp_lane_index_t lane = ucp_ep_get_tag_lane(sreq->send.ep);

sreq->send.lane = lane;
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
status = ucp_request_send_buffer_reg(sreq, lane);
if (status != UCS_OK) {
return status;
}
sreq->send.uct.func = ucp_tag_offload_rndv_zcopy;
} else {
sreq->send.uct.func = ucp_tag_offload_sw_rndv;
}
return UCS_OK;
}

const ucp_proto_t ucp_tag_offload_proto = {
.contig_short = ucp_tag_offload_eager_short,
.bcopy_single = ucp_tag_offload_eager_bcopy,
Expand Down
18 changes: 18 additions & 0 deletions src/ucp/tag/offload.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,31 @@
#include <ucp/proto/proto.h>


typedef struct ucp_sw_rndv_hdr {
ucp_request_hdr_t super;
size_t length;
} UCS_S_PACKED ucp_sw_rndv_hdr_t;


extern const ucp_proto_t ucp_tag_offload_proto;

extern const ucp_proto_t ucp_tag_offload_sync_proto;

ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self);

void ucp_tag_offload_cancel_rndv(ucp_request_t *req);

ucs_status_t ucp_tag_offload_start_rndv(ucp_request_t *sreq);

ucs_status_t ucp_tag_offload_unexp_eager(void *arg, void *data, size_t length,
unsigned flags, uct_tag_t stag, uint64_t imm);


ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, uint64_t stag,
const void *hdr, unsigned hdr_length,
uint64_t remote_addr, size_t length,
const void *rkey_buf);

void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force);

int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req);
Expand Down
63 changes: 51 additions & 12 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "rndv.h"
#include "tag_match.inl"

#include "offload.h"
#include <ucp/proto/proto_am.inl>
#include <ucp/core/ucp_request.inl>
#include <ucs/datastruct/queue.h>
Expand Down Expand Up @@ -121,8 +122,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rtr, (self),
return status;
}

void ucp_tag_send_start_rndv(ucp_request_t *sreq)
ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
{
ucs_status_t status;

ucs_trace_req("starting rndv. sreq: %p. buffer: %p, length: %zu",
sreq, sreq->send.buffer, sreq->send.length);
sreq->flags |= UCP_REQUEST_FLAG_RNDV;
Expand All @@ -132,8 +135,15 @@ void ucp_tag_send_start_rndv(ucp_request_t *sreq)
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL;
}

sreq->send.uct.func = ucp_proto_progress_rndv_rts;
if (sreq->send.ep->flags & UCP_EP_FLAG_TAG_OFFLOAD_ENABLED) {
status = ucp_tag_offload_start_rndv(sreq);
if (status != UCS_OK) {
return status;
}
} else {
sreq->send.uct.func = ucp_proto_progress_rndv_rts;
}
return UCS_OK;
}

static void ucp_rndv_send_ats(ucp_request_t *rndv_req, uintptr_t remote_request)
Expand Down Expand Up @@ -398,7 +408,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
}

if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) {
if ((rndv_rts_hdr->address != 0) && ucp_ep_is_rndv_lane_present(ep)) {
if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep) ||
(rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD))) {
/* read the data from the sender with a get_zcopy operation on the
* rndv lane */
ucp_rndv_handle_recv_contig(rndv_req, rreq, rndv_rts_hdr);
Expand All @@ -421,12 +432,14 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
(arg, data, length, am_flags),
void *arg, void *data, size_t length, unsigned am_flags)
(arg, data, length, tl_flags, desc_flags),
void *arg, void *data, size_t length, unsigned tl_flags,
unsigned desc_flags)
{
const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST |
UCP_RECV_DESC_FLAG_LAST |
UCP_RECV_DESC_FLAG_RNDV;
UCP_RECV_DESC_FLAG_LAST |
UCP_RECV_DESC_FLAG_RNDV |
desc_flags;
ucp_worker_h worker = arg;
ucp_rndv_rts_hdr_t *rndv_rts_hdr = data;
ucp_context_h context = worker->context;
Expand All @@ -440,16 +453,27 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
if (rreq != NULL) {
ucp_rndv_matched(worker, rreq, rndv_rts_hdr);

/* Cancel req in transport if it was offloaded, because it arrived
as unexpected */
if (recv_flags & UCP_RECV_DESC_FLAG_OFFLOAD) {
ucp_tag_offload_cancel(context, rreq, 1);
}

UCP_WORKER_STAT_RNDV(worker, EXP);
status = UCS_OK;
} else {
status = ucp_tag_unexp_recv(&context->tm, worker, data, length, am_flags,
status = ucp_tag_unexp_recv(&context->tm, worker, data, length, tl_flags,
sizeof(*rndv_rts_hdr), recv_flags);
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock);
return status;
}
ucs_status_t ucp_rndv_rts_handler_wrap(void *arg, void *data, size_t length,
unsigned tl_flags)
{
return ucp_rndv_rts_handler(arg, data, length, tl_flags, 0);
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,
(arg, data, length, flags),
Expand All @@ -460,7 +484,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,

/* dereg the original send request and set it to complete */
UCS_PROFILE_REQUEST_EVENT(sreq, "rndv_ats_recv", 0);
ucp_rndv_rma_request_send_buffer_dereg(sreq);
if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_tag_offload_cancel_rndv(sreq);
ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep));
} else {
ucp_rndv_rma_request_send_buffer_dereg(sreq);
}
ucp_request_send_generic_dt_finish(sreq);
ucp_request_complete_send(sreq, UCS_OK);
return UCS_OK;
Expand Down Expand Up @@ -591,7 +620,11 @@ static void ucp_rndv_prepare_zcopy_send_buffer(ucp_request_t *sreq, ucp_ep_h ep)
{
ucs_status_t status;

if ((ucp_ep_is_rndv_lane_present(ep)) &&
if ((sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) &&
(ucp_ep_get_am_lane(ep) != ucp_ep_get_tag_lane(ep))) {
ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep));
sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL;
} else if ((ucp_ep_is_rndv_lane_present(ep)) &&
(ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep))) {
/* dereg the original send request since we are going to send on the AM lane next */
ucp_rndv_rma_request_send_buffer_dereg(sreq);
Expand Down Expand Up @@ -636,6 +669,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler,
ucs_assert_always(!ucp_ep_is_stub(ep));
ucs_trace_req("RTR received. start sending on sreq %p", sreq);

if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) {
/* Do not deregister memory here, because am zcopy rndv may
* need it registered (if am and tag is the same lane). */
ucp_tag_offload_cancel_rndv(sreq);
}

if ((UCP_DT_IS_CONTIG(sreq->send.datatype)) &&
(sreq->send.length >= ucp_ep_config(ep)->am.zcopy_thresh[0])) {
/* send with zcopy */
Expand Down Expand Up @@ -751,7 +790,7 @@ static void ucp_rndv_dump(ucp_worker_h worker, uct_am_trace_type_t type,
}
}

UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler,
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler_wrap,
ucp_rndv_dump, UCT_AM_CB_FLAG_SYNC);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_ATS, ucp_rndv_ats_handler,
ucp_rndv_dump, UCT_AM_CB_FLAG_SYNC);
Expand Down
Loading