Skip to content

Commit

Permalink
Merge pull request #1549 from alex-mikheev/topic/ucp_tm_rndv_offload
Browse files Browse the repository at this point in the history
UCP: tm offload rndv support
  • Loading branch information
yosefe authored Jun 1, 2017
2 parents f0c13bd + 2ba0e9a commit 4962c52
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 25 deletions.
5 changes: 5 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ struct ucp_request {
uint64_t value;
void *result;
} amo;
struct {
void *rndv_op; /* Handler of issued rndv send. Needs to cancel the
operation if it is completed by SW */
} tag_offload;

};

ucp_lane_index_t lane; /* Lane on which this request is being sent */
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker,
iface_params.stats_root = UCS_STATS_RVAL(worker->stats);
iface_params.rx_headroom = rx_headroom;
iface_params.cpu_mask = *cpu_mask_param;
iface_params.eager_arg = worker;
iface_params.eager_arg = iface_params.rndv_arg = worker;
iface_params.eager_cb = ucp_tag_offload_unexp_eager;
iface_params.rndv_cb = ucp_tag_offload_unexp_rndv;

/* Open UCT interface */
status = uct_iface_open(context->tl_mds[resource->md_index].md, worker->uct,
Expand Down
4 changes: 3 additions & 1 deletion src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ static inline ucp_ep_h ucp_worker_ep_find(ucp_worker_h worker, uint64_t dest_uui
static UCS_F_ALWAYS_INLINE
uint64_t ucp_worker_is_tl_tag_offload(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return 0; /* Stub for now, offload TM RNDV is not implemented yet */
return (worker->ifaces[rsc_index].attr.cap.flags &
(UCT_IFACE_FLAG_TAG_EAGER_SHORT | UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
UCT_IFACE_FLAG_TAG_EAGER_ZCOPY | UCT_IFACE_FLAG_TAG_RNDV_ZCOPY));
}

#endif
2 changes: 2 additions & 0 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,
* because it arrived as unexpected */
if (flags & UCP_RECV_DESC_FLAG_OFFLOAD) {
ucp_tag_offload_cancel(context, req, 1);
} else {
ucs_assert(!(req->flags & UCP_REQUEST_FLAG_OFFLOADED));
}

if (flags & UCP_RECV_DESC_FLAG_LAST) {
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/eager_snd.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ static ucs_status_t ucp_tag_eager_zcopy_multi(uct_pending_req_t *self)
}

void ucp_tag_eager_zcopy_completion(uct_completion_t *self,
ucs_status_t status)
ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp);
ucp_tag_eager_zcopy_req_complete(req);
Expand Down
150 changes: 148 additions & 2 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,88 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag,
ucp_request_complete_recv(req, status);
}

/* RNDV request matched by the transport. Need to proceed with AM based RNDV */
void ucp_tag_offload_rndv_cb(uct_tag_context_t *self, uct_tag_t stag,
const void *header, unsigned header_length,
ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, recv.uct_ctx);
ucp_context_t *ctx = req->recv.worker->context;
ucp_sw_rndv_hdr_t *sreq = (ucp_sw_rndv_hdr_t*)header;
ucp_worker_iface_t *iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces,
ucp_worker_iface_t, queue);
ucp_rndv_rts_hdr_t rts;

/* Emulate RTS without rkey (to be handled as AM-based RNDV). */
rts.sreq = sreq->super;
rts.super.tag = stag;
rts.flags = 0;
rts.address = 0; /* RNDV needs to be completed in SW */
rts.size = sreq->length;

ucp_request_memory_dereg(ctx, iface->rsc_index, req->recv.datatype,
&req->recv.state);
/* coverity[address_of] */
ucp_rndv_matched(req->recv.worker, req, &rts);
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_tag_offload_unexp_rndv,
(arg, flags, stag, hdr, hdr_length, remote_addr, length, rkey_buf),
void *arg, unsigned flags, uint64_t stag, const void *hdr,
unsigned hdr_length, uint64_t remote_addr, size_t length,
const void *rkey_buf)
{
ucp_worker_t *worker = arg;
ucp_request_hdr_t *rndv_hdr = (ucp_request_hdr_t*)hdr;
ucp_ep_t *ep = ucp_worker_get_reply_ep(worker, rndv_hdr->sender_uuid);
const uct_md_attr_t *md_attr = ucp_ep_md_attr(ep, ucp_ep_get_tag_lane(ep));
size_t rkey_size = rkey_buf ? md_attr->rkey_packed_size : 0;
size_t len = sizeof(ucp_rndv_rts_hdr_t) + rkey_size;
ucp_rndv_rts_hdr_t *rts = ucs_alloca(len);
ucp_sw_rndv_hdr_t *sw_rndv_hdr;

/* Fill RTS to emulate SW RNDV flow. */
rts->super.tag = stag;
rts->sreq = *rndv_hdr;
rts->address = remote_addr;

if (remote_addr) {
rts->size = length;
rts->flags = UCP_RNDV_RTS_FLAG_OFFLOAD;
if (rkey_buf) {
memcpy(rts + 1, rkey_buf, rkey_size);
len += rkey_size;
rts->flags |= UCP_RNDV_RTS_FLAG_PACKED_RKEY;
}
} else {
/* This must be SW RNDV request. Take length from its header. */
sw_rndv_hdr = ucs_derived_of(hdr, ucp_sw_rndv_hdr_t);
rts->size = sw_rndv_hdr->length;
rts->flags = 0;
}

/* Pass 0 as tl flags, because RTS needs to be stored in UCP pool. */
ucp_rndv_process_rts(arg, rts, len, 0);

/* Always return UCS_OK, since RNDV hdr should be stored in UCP mpool. */
return UCS_OK;
}

void ucp_tag_offload_cancel(ucp_context_t *ctx, ucp_request_t *req, int force)
{
ucp_worker_iface_t *ucp_iface;
ucs_status_t status;

ucs_assert(req->flags & UCP_REQUEST_FLAG_OFFLOADED);
if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
return;
}

ucp_iface = ucs_queue_head_elem_non_empty(&ctx->tm.offload_ifaces,
ucp_worker_iface_t, queue);
ucp_request_memory_dereg(ctx, ucp_iface->rsc_index, req->recv.datatype,
&req->recv.state);
status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx, force);
status = uct_iface_tag_recv_cancel(ucp_iface->iface, &req->recv.uct_ctx,
force);
if (status != UCS_OK) {
ucs_error("Failed to cancel recv in the transport: %s",
ucs_status_string(status));
Expand Down Expand Up @@ -99,6 +169,7 @@ int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req)

req->recv.uct_ctx.tag_consumed_cb = ucp_tag_offload_tag_consumed;
req->recv.uct_ctx.completed_cb = ucp_tag_offload_completed;
req->recv.uct_ctx.rndv_cb = ucp_tag_offload_rndv_cb;

iov.buffer = (void*)req->recv.buffer;
iov.length = length;
Expand Down Expand Up @@ -213,6 +284,81 @@ static ucs_status_t ucp_tag_offload_eager_zcopy(uct_pending_req_t *self)
return ucp_do_tag_offload_zcopy(self, 0ul, ucp_tag_eager_zcopy_req_complete);
}

ucs_status_t ucp_tag_offload_sw_rndv(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_t *ep = req->send.ep;
ucp_sw_rndv_hdr_t rndv_hdr = {
.super.sender_uuid = req->send.ep->worker->uuid,
.super.reqptr = (uintptr_t)req,
.length = req->send.length
};

return uct_ep_tag_rndv_request(ep->uct_eps[req->send.lane], req->send.tag,
&rndv_hdr, sizeof(rndv_hdr));
}

ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_t *ep = req->send.ep;
size_t max_iov = ucp_ep_config(ep)->tag.eager.max_iov;
uct_iov_t *iov = ucs_alloca(max_iov * sizeof(uct_iov_t));
size_t iovcnt = 0;
ucp_request_hdr_t rndv_hdr = {
.sender_uuid = ep->worker->uuid,
.reqptr = (uintptr_t)req
};
void *rndv_op;

req->send.uct_comp.count = 1;
req->send.uct_comp.func = ucp_tag_eager_zcopy_completion;

ucs_assert_always(UCP_DT_IS_CONTIG(req->send.datatype));
ucp_dt_iov_copy_uct(iov, &iovcnt, max_iov, &req->send.state, req->send.buffer,
req->send.datatype, req->send.length);

rndv_op = uct_ep_tag_rndv_zcopy(ep->uct_eps[req->send.lane], req->send.tag,
&rndv_hdr, sizeof(rndv_hdr), iov, iovcnt,
&req->send.uct_comp);
if (UCS_PTR_IS_ERR(rndv_op)) {
return UCS_PTR_STATUS(rndv_op);
}
req->flags |= UCP_REQUEST_FLAG_OFFLOADED;
req->send.tag_offload.rndv_op = rndv_op;
return UCS_OK;
}

void ucp_tag_offload_cancel_rndv(ucp_request_t *req)
{
ucp_ep_t *ep = req->send.ep;
ucs_status_t status;

status = uct_ep_tag_rndv_cancel(ep->uct_eps[ucp_ep_get_tag_lane(ep)],
req->send.tag_offload.rndv_op);
if (status != UCS_OK) {
ucs_error("Failed to cancel tag rndv op %s", ucs_status_string(status));
}
}

ucs_status_t ucp_tag_offload_start_rndv(ucp_request_t *sreq)
{
ucs_status_t status;
ucp_lane_index_t lane = ucp_ep_get_tag_lane(sreq->send.ep);

sreq->send.lane = lane;
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
status = ucp_request_send_buffer_reg(sreq, lane);
if (status != UCS_OK) {
return status;
}
sreq->send.uct.func = ucp_tag_offload_rndv_zcopy;
} else {
sreq->send.uct.func = ucp_tag_offload_sw_rndv;
}
return UCS_OK;
}

const ucp_proto_t ucp_tag_offload_proto = {
.contig_short = ucp_tag_offload_eager_short,
.bcopy_single = ucp_tag_offload_eager_bcopy,
Expand Down
18 changes: 18 additions & 0 deletions src/ucp/tag/offload.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,31 @@
#include <ucp/proto/proto.h>


typedef struct ucp_sw_rndv_hdr {
ucp_request_hdr_t super;
size_t length;
} UCS_S_PACKED ucp_sw_rndv_hdr_t;


extern const ucp_proto_t ucp_tag_offload_proto;

extern const ucp_proto_t ucp_tag_offload_sync_proto;

ucs_status_t ucp_tag_offload_rndv_zcopy(uct_pending_req_t *self);

void ucp_tag_offload_cancel_rndv(ucp_request_t *req);

ucs_status_t ucp_tag_offload_start_rndv(ucp_request_t *sreq);

ucs_status_t ucp_tag_offload_unexp_eager(void *arg, void *data, size_t length,
unsigned flags, uct_tag_t stag, uint64_t imm);


ucs_status_t ucp_tag_offload_unexp_rndv(void *arg, unsigned flags, uint64_t stag,
const void *hdr, unsigned hdr_length,
uint64_t remote_addr, size_t length,
const void *rkey_buf);

void ucp_tag_offload_cancel(ucp_context_t *context, ucp_request_t *req, int force);

int ucp_tag_offload_post(ucp_context_t *ctx, ucp_request_t *req);
Expand Down
58 changes: 47 additions & 11 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "rndv.h"
#include "tag_match.inl"

#include "offload.h"
#include <ucp/proto/proto_am.inl>
#include <ucp/core/ucp_request.inl>
#include <ucs/datastruct/queue.h>
Expand Down Expand Up @@ -121,8 +122,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rtr, (self),
return status;
}

void ucp_tag_send_start_rndv(ucp_request_t *sreq)
ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
{
ucs_status_t status;

ucs_trace_req("starting rndv. sreq: %p. buffer: %p, length: %zu",
sreq, sreq->send.buffer, sreq->send.length);
sreq->flags |= UCP_REQUEST_FLAG_RNDV;
Expand All @@ -132,8 +135,15 @@ void ucp_tag_send_start_rndv(ucp_request_t *sreq)
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL;
}

sreq->send.uct.func = ucp_proto_progress_rndv_rts;
if (sreq->send.ep->flags & UCP_EP_FLAG_TAG_OFFLOAD_ENABLED) {
status = ucp_tag_offload_start_rndv(sreq);
if (status != UCS_OK) {
return status;
}
} else {
sreq->send.uct.func = ucp_proto_progress_rndv_rts;
}
return UCS_OK;
}

static void ucp_rndv_send_ats(ucp_request_t *rndv_req, uintptr_t remote_request)
Expand Down Expand Up @@ -396,7 +406,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
}

if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) {
if ((rndv_rts_hdr->address != 0) && ucp_ep_is_rndv_lane_present(ep)) {
if ((rndv_rts_hdr->address != 0) && (ucp_ep_is_rndv_lane_present(ep) ||
(rndv_rts_hdr->flags & UCP_RNDV_RTS_FLAG_OFFLOAD))) {
/* read the data from the sender with a get_zcopy operation on the
* rndv lane */
ucp_rndv_handle_recv_contig(rndv_req, rreq, rndv_rts_hdr);
Expand All @@ -418,12 +429,11 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
UCS_ASYNC_UNBLOCK(&worker->async);
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
(arg, data, length, am_flags),
void *arg, void *data, size_t length, unsigned am_flags)
ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,
unsigned tl_flags)
{
const unsigned recv_flags = UCP_RECV_DESC_FLAG_FIRST |
UCP_RECV_DESC_FLAG_LAST |
UCP_RECV_DESC_FLAG_LAST |
UCP_RECV_DESC_FLAG_RNDV;
ucp_worker_h worker = arg;
ucp_rndv_rts_hdr_t *rndv_rts_hdr = data;
Expand All @@ -438,17 +448,28 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
if (rreq != NULL) {
ucp_rndv_matched(worker, rreq, rndv_rts_hdr);

/* Cancel req in transport if it was offloaded, because it arrived
as unexpected */
ucp_tag_offload_cancel(context, rreq, 1);

UCP_WORKER_STAT_RNDV(worker, EXP);
status = UCS_OK;
} else {
status = ucp_tag_unexp_recv(&context->tm, worker, data, length, am_flags,
status = ucp_tag_unexp_recv(&context->tm, worker, data, length, tl_flags,
sizeof(*rndv_rts_hdr), recv_flags);
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock);
return status;
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
(arg, data, length, tl_flags),
void *arg, void *data, size_t length, unsigned tl_flags)
{
return ucp_rndv_process_rts(arg, data, length, tl_flags);
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,
(arg, data, length, flags),
void *arg, void *data, size_t length, unsigned flags)
Expand All @@ -458,7 +479,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,

/* dereg the original send request and set it to complete */
UCS_PROFILE_REQUEST_EVENT(sreq, "rndv_ats_recv", 0);
ucp_rndv_rma_request_send_buffer_dereg(sreq);
if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) {
ucp_tag_offload_cancel_rndv(sreq);
ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep));
} else {
ucp_rndv_rma_request_send_buffer_dereg(sreq);
}
ucp_request_send_generic_dt_finish(sreq);
ucp_request_complete_send(sreq, UCS_OK);
return UCS_OK;
Expand Down Expand Up @@ -589,7 +615,11 @@ static void ucp_rndv_prepare_zcopy_send_buffer(ucp_request_t *sreq, ucp_ep_h ep)
{
ucs_status_t status;

if ((ucp_ep_is_rndv_lane_present(ep)) &&
if ((sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) &&
(ucp_ep_get_am_lane(ep) != ucp_ep_get_tag_lane(ep))) {
ucp_request_send_buffer_dereg(sreq, ucp_ep_get_tag_lane(sreq->send.ep));
sreq->send.state.dt.contig.memh = UCT_MEM_HANDLE_NULL;
} else if ((ucp_ep_is_rndv_lane_present(ep)) &&
(ucp_ep_get_am_lane(ep) != ucp_ep_get_rndv_get_lane(ep))) {
/* dereg the original send request since we are going to send on the AM lane next */
ucp_rndv_rma_request_send_buffer_dereg(sreq);
Expand Down Expand Up @@ -634,6 +664,12 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler,
ucs_assert_always(!ucp_ep_is_stub(ep));
ucs_trace_req("RTR received. start sending on sreq %p", sreq);

if (sreq->flags & UCP_REQUEST_FLAG_OFFLOADED) {
/* Do not deregister memory here, because am zcopy rndv may
* need it registered (if am and tag is the same lane). */
ucp_tag_offload_cancel_rndv(sreq);
}

if ((UCP_DT_IS_CONTIG(sreq->send.datatype)) &&
(sreq->send.length >= ucp_ep_config(ep)->am.zcopy_thresh[0])) {
/* send with zcopy */
Expand Down
Loading

0 comments on commit 4962c52

Please sign in to comment.