Skip to content

Commit

Permalink
UCP: tag matching offload eager support
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-mikheev committed May 24, 2017
1 parent 3e160da commit a44896d
Show file tree
Hide file tree
Showing 16 changed files with 418 additions and 34 deletions.
2 changes: 2 additions & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ noinst_HEADERS = \
tag/rndv.h \
tag/tag_match.h \
tag/tag_match.inl \
tag/offload.h \
wireup/address.h \
wireup/stub_ep.h \
wireup/wireup.h
Expand Down Expand Up @@ -66,6 +67,7 @@ libucp_la_SOURCES = \
tag/tag_match.c \
tag/tag_recv.c \
tag/tag_send.c \
tag/offload.c \
wireup/address.c \
wireup/select.c \
wireup/stub_ep.c \
Expand Down
5 changes: 3 additions & 2 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ucp/wireup/stub_ep.h>
#include <ucp/wireup/wireup.h>
#include <ucp/tag/eager.h>
#include <ucp/tag/offload.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/sys/string.h>
Expand Down Expand Up @@ -809,8 +810,8 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)

config->tag.offload.max_recv_iov = iface_attr->cap.tag.recv.max_iov;
config->tag.offload.max_rndv_iov = iface_attr->cap.tag.rndv.max_iov;
config->tag.sync_proto = NULL;
config->tag.proto = NULL;
config->tag.sync_proto = &ucp_tag_offload_sync_proto;
config->tag.proto = &ucp_tag_offload_proto;
config->tag.lane = lane;
max_rndv_thresh = iface_attr->cap.tag.eager.max_zcopy;
max_am_rndv_thresh = iface_attr->cap.tag.eager.max_bcopy;
Expand Down
65 changes: 44 additions & 21 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "ucp_worker.h"
#include "ucp_request.inl"

#include <ucp/tag/tag_match.h>
#include <ucs/datastruct/mpool.inl>
#include <ucs/debug/debug.h>
#include <ucs/debug/log.h>
Expand Down Expand Up @@ -86,7 +85,10 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request),
UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock);

ucp_tag_exp_remove(&worker->context->tm, req);
ucp_request_complete_recv(req, UCS_ERR_CANCELED);
/* If tag posted to the transport need to wait its completion */
if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
ucp_request_complete_recv(req, UCS_ERR_CANCELED);
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock);
UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock);
Expand Down Expand Up @@ -166,25 +168,26 @@ void ucp_iov_buffer_memh_dereg(uct_md_h uct_md, uct_mem_h *memh,
}
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane),
ucp_request_t *req, ucp_lane_index_t lane)
ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
void *buffer, size_t length,
ucp_datatype_t datatype, ucp_dt_state_t *state)
{
uct_md_h uct_md = ucp_ep_md(req->send.ep, lane);
ucp_dt_state_t *state = &req->send.state;
ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index;
uct_md_h uct_md = context->tl_mds[mdi].md;
uct_md_attr_t *uct_md_attr;
size_t iov_it, iovcnt;
const ucp_dt_iov_t *iov;
uct_mem_h *memh;
ucs_status_t status;

status = UCS_OK;
switch (req->send.datatype & UCP_DATATYPE_CLASS_MASK) {
switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
status = uct_md_mem_reg(uct_md, (void *)req->send.buffer, req->send.length,
0, &state->dt.contig.memh);
status = uct_md_mem_reg(uct_md, buffer, length, 0, &state->dt.contig.memh);
break;
case UCP_DATATYPE_IOV:
iovcnt = state->dt.iov.iovcnt;
iov = req->send.buffer;
iov = buffer;
memh = ucs_malloc(sizeof(*memh) * iovcnt, "IOV memh");
if (NULL == memh) {
status = UCS_ERR_NO_MEMORY;
Expand All @@ -208,31 +211,30 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane),
break;
default:
status = UCS_ERR_INVALID_PARAM;
ucs_error("Invalid data type %lx", req->send.datatype);
ucs_error("Invalid data type %lx", datatype);
}

err:
if (status != UCS_OK) {
uct_md_attr = &context->tl_mds[mdi].attr;
ucs_error("failed to register user buffer [datatype=%lx address=%p "
"len=%zu pd=\"%s\"]: %s",
req->send.datatype, req->send.buffer, req->send.length,
ucp_ep_md_attr(req->send.ep, lane)->component_name,
ucs_status_string(status));
"len=%zu pd=\"%s\"]: %s", datatype, buffer, length,
uct_md_attr->component_name, ucs_status_string(status));
}
return status;
}

UCS_PROFILE_FUNC_VOID(ucp_request_send_buffer_dereg, (req, lane),
ucp_request_t *req, ucp_lane_index_t lane)
void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state)
{
uct_md_h uct_md = ucp_ep_md(req->send.ep, lane);
ucp_dt_state_t *state = &req->send.state;
ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index;
uct_md_h uct_md = context->tl_mds[mdi].md;
uct_mem_h *memh;
size_t iov_it;

switch (req->send.datatype & UCP_DATATYPE_CLASS_MASK) {
switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
if (req->send.state.dt.contig.memh != UCT_MEM_HANDLE_NULL) {
if (state->dt.contig.memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(uct_md, state->dt.contig.memh);
}
break;
Expand All @@ -250,3 +252,24 @@ UCS_PROFILE_FUNC_VOID(ucp_request_send_buffer_dereg, (req, lane),
}
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_request_send_buffer_reg, (req, lane),
ucp_request_t *req, ucp_lane_index_t lane)
{
ucp_context_t *context = req->send.ep->worker->context;
ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(req->send.ep, lane);

return ucp_request_memory_reg(context, rsc_index, (void*)req->send.buffer,
req->send.length, req->send.datatype,
&req->send.state);
}

UCS_PROFILE_FUNC_VOID(ucp_request_send_buffer_dereg, (req, lane),
ucp_request_t *req, ucp_lane_index_t lane)
{
ucp_context_t *context = req->send.ep->worker->context;
ucp_rsc_index_t rsc_index = ucp_ep_get_rsc_index(req->send.ep, lane);

return ucp_request_memory_dereg(context, rsc_index, req->send.datatype,
&req->send.state);
}

17 changes: 16 additions & 1 deletion src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ enum {
UCP_REQUEST_FLAG_RECV = UCS_BIT(7),
UCP_REQUEST_FLAG_SYNC = UCS_BIT(8),
UCP_REQUEST_FLAG_RNDV = UCS_BIT(9),
UCP_REQUEST_FLAG_MATCHED = UCS_BIT(10),
UCP_REQUEST_FLAG_OFFLOADED = UCS_BIT(11),
UCP_REQUEST_FLAG_BLOCK_OFFLOAD = UCS_BIT(12),

#if ENABLE_ASSERT
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(15)
Expand All @@ -51,7 +54,8 @@ enum {
UCP_RECV_DESC_FLAG_EAGER = UCS_BIT(2),
UCP_RECV_DESC_FLAG_SYNC = UCS_BIT(3),
UCP_RECV_DESC_FLAG_RNDV = UCS_BIT(4),
UCP_RECV_DESC_FLAG_UCT_DESC = UCS_BIT(5)
UCP_RECV_DESC_FLAG_UCT_DESC = UCS_BIT(5),
UCP_RECV_DESC_FLAG_OFFLOAD = UCS_BIT(6)
};


Expand Down Expand Up @@ -143,6 +147,10 @@ struct ucp_request {
ucp_tag_recv_callback_t cb; /* Completion callback */
ucp_tag_recv_info_t info; /* Completion info to fill */
ucp_dt_state_t state;
ucp_worker_t *worker;

/* Transport offload context */
uct_tag_context_t uct_ctx;
} recv;
};
};
Expand Down Expand Up @@ -170,4 +178,11 @@ ucs_status_t ucp_request_send_buffer_reg(ucp_request_t *req, ucp_lane_index_t la

void ucp_request_send_buffer_dereg(ucp_request_t *req, ucp_lane_index_t lane);

ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
void *buffer, size_t length,
ucp_datatype_t datatype, ucp_dt_state_t *state);

void ucp_request_memory_dereg(ucp_context_t *context, ucp_rsc_index_t rsc_index,
ucp_datatype_t datatype, ucp_dt_state_t *state);

#endif
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ ucp_request_complete_recv(ucp_request_t *req, ucs_status_t status)
req->recv.info.sender_tag, req->recv.info.length,
ucs_status_string(status));
UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);

if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) {
--req->recv.worker->context->tm.sw_req_count;
}
ucp_request_complete(req, recv.cb, status, &req->recv.info);
}

Expand Down
7 changes: 6 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ucp/wireup/address.h>
#include <ucp/wireup/stub_ep.h>
#include <ucp/tag/eager.h>
#include <ucp/tag/offload.h>
#include <ucs/datastruct/mpool.inl>
#include <ucs/datastruct/queue.h>
#include <ucs/type/cpu_set.h>
Expand Down Expand Up @@ -254,6 +255,8 @@ static ucs_status_t ucp_worker_add_iface(ucp_worker_h worker,
iface_params.stats_root = UCS_STATS_RVAL(worker->stats);
iface_params.rx_headroom = rx_headroom;
iface_params.cpu_mask = *cpu_mask_param;
iface_params.eager_arg = worker;
iface_params.eager_cb = ucp_tag_offload_unexp_eager;

/* Open UCT interface */
status = uct_iface_open(context->tl_mds[resource->md_index].md, worker->uct,
Expand Down Expand Up @@ -538,7 +541,9 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
ucs_cpu_set_t empty_cpu_mask;
ucs_thread_mode_t thread_mode;
ucp_wakeup_event_t events;
const size_t rx_headroom = sizeof(ucp_recv_desc_t);

/* Space for eager header is needed for unexpected tag offload messages */
const size_t rx_headroom = sizeof(ucp_recv_desc_t) + sizeof(ucp_eager_hdr_t);

config_count = ucs_min((context->num_tls + 1) * (context->num_tls + 1) * context->num_tls,
UINT8_MAX);
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ static inline ucp_ep_h ucp_worker_ep_find(ucp_worker_h worker, uint64_t dest_uui
static UCS_F_ALWAYS_INLINE
uint64_t ucp_worker_is_tl_tag_offload(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return 0; /* Stub for now, offload TM proto is not implemented yet */
return 0; /* Stub for now, offload TM RNDV is not implemented yet */
}

#endif
16 changes: 12 additions & 4 deletions src/ucp/tag/eager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,22 @@ void ucp_tag_eager_sync_send_ack(ucp_worker_h worker, uint64_t sender_uuid,

void ucp_tag_eager_sync_completion(ucp_request_t *req, uint16_t flag);

void ucp_tag_eager_zcopy_completion(uct_completion_t *self, ucs_status_t status);

void ucp_tag_eager_zcopy_req_complete(ucp_request_t *req);

static inline ucs_status_t ucp_tag_send_eager_short(ucp_ep_t *ep, ucp_tag_t tag,
const void *buffer, size_t length)
{
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(ucp_eager_hdr_t));
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uint64_t));
return uct_ep_am_short(ucp_ep_get_am_uct_ep(ep), UCP_AM_ID_EAGER_ONLY, tag,
buffer, length);
if (ucp_ep_is_tag_offload_enabled(ucp_ep_config(ep))) {
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uct_tag_t));
return uct_ep_tag_eager_short(ucp_ep_get_tag_uct_ep(ep), tag, buffer, length);
} else {
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(ucp_eager_hdr_t));
UCS_STATIC_ASSERT(sizeof(ucp_tag_t) == sizeof(uint64_t));
return uct_ep_am_short(ucp_ep_get_am_uct_ep(ep), UCP_AM_ID_EAGER_ONLY, tag,
buffer, length);
}
}

static UCS_F_ALWAYS_INLINE size_t
Expand Down
21 changes: 21 additions & 0 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "eager.h"
#include "tag_match.inl"
#include "offload.h"

#include <ucp/core/ucp_context.h>
#include <ucp/core/ucp_worker.h>
Expand Down Expand Up @@ -69,6 +70,11 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,
if (flags & UCP_RECV_DESC_FLAG_FIRST) {
UCP_WORKER_STAT_EAGER_MSG(worker, flags);
req->recv.info.sender_tag = recv_tag;

/* Cancel req in transport if it was offloaded,
* because it arrived as unexpected */
ucp_tag_offload_cancel(context, req, 1);

if (flags & UCP_RECV_DESC_FLAG_LAST) {
req->recv.info.length = recv_len;
} else {
Expand Down Expand Up @@ -169,6 +175,21 @@ static ucs_status_t ucp_eager_sync_ack_handler(void *arg, void *data,
return UCS_OK;
}

ucs_status_t ucp_tag_offload_unexp_eager(void *arg, void *data, size_t length,
unsigned flags, uct_tag_t stag, uint64_t imm)
{
/* Align data with AM protocol. We should add tag before the data. */
ucp_eager_hdr_t *hdr = ((ucp_eager_hdr_t*)data) - 1;
hdr->super.tag = stag;

return ucp_eager_handler(arg, hdr, length + sizeof(ucp_eager_hdr_t), flags,
UCP_RECV_DESC_FLAG_EAGER |
UCP_RECV_DESC_FLAG_FIRST |
UCP_RECV_DESC_FLAG_LAST |
UCP_RECV_DESC_FLAG_OFFLOAD,
sizeof(ucp_eager_hdr_t));
}

static void ucp_eager_dump(ucp_worker_h worker, uct_am_trace_type_t type,
uint8_t id, const void *data, size_t length,
char *buffer, size_t max)
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/tag/eager_snd.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ static ucs_status_t ucp_tag_eager_bcopy_multi(uct_pending_req_t *self)
return status;
}

static void ucp_tag_eager_zcopy_req_complete(ucp_request_t *req)
void ucp_tag_eager_zcopy_req_complete(ucp_request_t *req)
{
ucp_request_send_buffer_dereg(req, req->send.lane); /* TODO register+lane change */
ucp_request_complete_send(req, UCS_OK);
Expand Down Expand Up @@ -198,7 +198,7 @@ static ucs_status_t ucp_tag_eager_zcopy_multi(uct_pending_req_t *self)
ucp_tag_eager_zcopy_req_complete);
}

static void ucp_tag_eager_zcopy_completion(uct_completion_t *self,
void ucp_tag_eager_zcopy_completion(uct_completion_t *self,
ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct_comp);
Expand Down
Loading

0 comments on commit a44896d

Please sign in to comment.