Skip to content

Commit

Permalink
UCP/RNDV: Add more debug data for rndv
Browse files Browse the repository at this point in the history
- RTS send/recv sequence number
- Save all send requests
- Receive request id
  • Loading branch information
yosefe committed Sep 13, 2020
1 parent 8bf84d4 commit 67d1965
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 40 deletions.
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct ucp_request {
ucp_tag_t tag;
uintptr_t rreq_ptr; /* receive request ptr on the
recv side (used in AM rndv) */
uint64_t rts_send_seq; /* sequence number of actual rts send */
} tag;

struct {
Expand Down Expand Up @@ -270,6 +271,7 @@ struct ucp_request {
ucp_worker_t *worker;
uct_tag_context_t uct_ctx; /* Transport offload context */
unsigned prev_flags;
uint64_t req_id;

union {
struct {
Expand Down Expand Up @@ -344,6 +346,7 @@ struct ucp_recv_desc {
uint32_t length; /* Received length */
uint32_t payload_offset; /* Offset from end of the descriptor
* to AM data */
uint64_t rndv_rts_seq; /* RNDV: rts sequence number */
uint16_t flags; /* Flags */
int16_t priv_length; /* Number of bytes consumed from
headroom private space, except the
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,8 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->am_message_id = ucs_generate_uuid(0);
worker->rkey_ptr_cb_id = UCS_CALLBACKQ_ID_NULL;
worker->rndv_req_id = 1;
worker->rndv_rts_send_seq = 0;
worker->rndv_rts_recv_seq = 0;
ucs_queue_head_init(&worker->rkey_ptr_reqs);
ucs_list_head_init(&worker->arm_ifaces);
ucs_list_head_init(&worker->stream_ready_eps);
Expand Down
10 changes: 10 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ typedef struct ucp_worker {
khash_t(ucp_worker_rndv_req_ptrs) rndv_req_ptrs;
uint64_t rndv_req_id;

uint64_t rndv_rts_send_seq;
uint64_t rndv_rts_recv_seq;

ucp_ep_match_ctx_t ep_match_ctx; /* Endpoint-to-endpoint matching context */
ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces,
one for each resource */
Expand Down Expand Up @@ -375,4 +378,11 @@ ucp_worker_sockaddr_is_cm_proto(const ucp_worker_h worker)
return !!ucp_worker_num_cm_cmpts(worker);
}

static inline ucp_tag_rndv_debug_entry_t*
ucp_worker_rndv_debug_entry(ucp_worker_h worker, uint64_t req_id)
{
size_t elem_index = req_id % worker->tm.rndv_debug.queue_length;
return &worker->tm.rndv_debug.queue[elem_index];
}

#endif
4 changes: 2 additions & 2 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_rndv_cb,
ucs_assert(header_length >= sizeof(ucp_rndv_rts_hdr_t));

if (UCP_MEM_IS_ACCESSIBLE_FROM_CPU(req->recv.mem_type)) {
ucp_rndv_matched(req->recv.worker, req, header);
ucp_rndv_matched(req->recv.worker, req, header, 0);
} else {
/* SW rendezvous request is stored in the user buffer (temporarily)
when matched. If user buffer allocated on GPU memory, need to "pack"
it to the host memory staging buffer for further processing. */
header_host_copy = ucs_alloca(header_length);
ucp_mem_type_pack(req->recv.worker, header_host_copy, header,
header_length, req->recv.mem_type);
ucp_rndv_matched(req->recv.worker, req, header_host_copy);
ucp_rndv_matched(req->recv.worker, req, header_host_copy, 0);
}

ucp_tag_offload_release_buf(req, 0);
Expand Down
170 changes: 135 additions & 35 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,114 @@ static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
ucp_request_complete_send(sreq, status);
}

/* add debug entry for rndv send flow */
static ucp_tag_rndv_debug_entry_t*
ucp_rndv_add_debug_entry_common(ucp_request_t *req)
{
ucp_worker_h worker = req->send.ep->worker;
ucp_tag_rndv_debug_entry_t *entry = ucp_worker_rndv_debug_entry(worker,
req->send.rndv_req_id);
entry->id = req->send.rndv_req_id;
entry->ep = req->send.ep;
entry->local_address = req->send.buffer;
entry->size = req->send.length;

return entry;
}

/* add debug entry for rndv_get flow */
static void
ucp_rndv_get_req_add_debug_entry(ucp_request_t *rndv_req,
const ucp_rndv_rts_hdr_t *rndv_rts_hdr,
uint64_t rts_seq)
{
ucp_worker_h worker = rndv_req->send.ep->worker;
ucp_request_t *rreq = rndv_req->send.rndv_get.rreq;
ucp_tag_rndv_debug_entry_t *entry;

/* set request id */
rndv_req->send.rndv_req_id = worker->rndv_req_id;
worker->rndv_req_id++;

/* add entry for rndv_get request */
entry = ucp_rndv_add_debug_entry_common(rndv_req);
entry->type = "rndv_get";
entry->rts_seq = rts_seq;
entry->send_tag = rndv_rts_hdr->super.tag;
entry->recv_tag = rreq->recv.tag.tag;
entry->remote_address = rndv_req->send.rndv_get.remote_address;
entry->remote_reqptr = rndv_req->send.rndv_get.remote_request;
entry->rndv_get_req = rndv_req;
entry->recv_req = rreq;
entry->send_req = NULL;

/* add more data in existing receive request entry */
entry = ucp_worker_rndv_debug_entry(worker, rreq->recv.req_id);
entry->ep = rndv_req->send.ep;
entry->rts_seq = rts_seq;
entry->send_tag = rndv_rts_hdr->super.tag;
entry->remote_address = rndv_req->send.rndv_get.remote_address;
entry->remote_reqptr = rndv_req->send.rndv_get.remote_request;
entry->rndv_get_req = rndv_req;
entry->send_req = NULL;
}

/* add debug entry for rndv send flow */
static void
ucp_rndv_send_add_debug_entry(ucp_request_t *req)
{
ucp_tag_rndv_debug_entry_t *entry;

entry = ucp_rndv_add_debug_entry_common(req);
entry->type = "rndv_send";
entry->rts_seq = 0;
entry->send_tag = req->send.msg_proto.tag.tag;
entry->recv_tag = 0;
entry->remote_address = 0;
entry->remote_reqptr = 0;
entry->rndv_get_req = NULL;
entry->recv_req = NULL;
entry->send_req = req;
}

/* to be used from debugger */
void ucp_rndv_print_debug_data(ucp_worker_h worker, const char *filename,
ucp_tag_t send_tag)
{
ucp_tag_rndv_debug_entry_t *entry;
size_t i, count;
FILE *file;

if (filename == NULL) {
file = stdout;
} else {
file = fopen(filename, "w");
if (file == NULL) {
fprintf(stderr, "cannot open %s: %m\n", filename);
return;
}
}

count = ucs_min(worker->tm.rndv_debug.queue_length, worker->rndv_req_id);
for (i = 0; i < count; ++i) {
entry = &worker->tm.rndv_debug.queue[i];
if ((send_tag != 0) && (send_tag != entry->send_tag)) {
continue;
}
fprintf(file,
"%s id %lu rts_seq %lu stag 0x%lx rtag 0x%lx rva 0x%lx rmreq 0x%lx "
"lva %p sz %zu greq %p rreq %p sreq %p\n",
entry->type, entry->id, entry->rts_seq, entry->send_tag,
entry->recv_tag, entry->remote_address, entry->remote_reqptr,
entry->local_address, entry->size, entry->rndv_get_req,
entry->recv_req, entry->send_req);
}

if (filename != NULL) {
fclose(file);
}
}

size_t ucp_tag_rndv_rts_pack(void *dest, void *arg)
{
ucp_request_t *sreq = arg; /* send request */
Expand Down Expand Up @@ -108,14 +216,21 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self),
uct_pending_req_t *self)
{
ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_h ep = sreq->send.ep;
ucp_worker_h worker = ep->worker;
size_t packed_rkey_size;
ucs_status_t status;

/* send the RTS. the pack_cb will pack all the necessary fields in the RTS */
packed_rkey_size = ucp_ep_config(sreq->send.ep)->tag.rndv.rkey_size;
packed_rkey_size = ucp_ep_config(ep)->tag.rndv.rkey_size;
status = ucp_do_am_single(self, UCP_AM_ID_RNDV_RTS, ucp_tag_rndv_rts_pack,
sizeof(ucp_rndv_rts_hdr_t) + packed_rkey_size);
if (status == UCS_OK) {
sreq->send.msg_proto.tag.rts_send_seq = worker->rndv_rts_send_seq++;
if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
ucp_worker_rndv_debug_entry(worker, sreq->send.rndv_req_id)->rts_seq =
sreq->send.msg_proto.tag.rts_send_seq;
}
sreq->flags |= UCP_REQUEST_FLAG_RNDV_RTS_SENT;
return status;
} else if (status == UCS_ERR_NO_RESOURCE) {
Expand Down Expand Up @@ -256,6 +371,11 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)

sreq->flags |= UCP_REQUEST_FLAG_SEND_RNDV;

sreq->send.rndv_req_id = worker->rndv_req_id++;
if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
ucp_rndv_send_add_debug_entry(sreq);
}

status = ucp_ep_resolve_dest_ep_ptr(ep, sreq->send.lane);
if (status != UCS_OK) {
return status;
Expand All @@ -272,19 +392,17 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
/* add the rndv send request to a hash on the worker. the key is a unique
* value on the worker */
khiter = kh_put(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs,
worker->rndv_req_id, &ret);
sreq->send.rndv_req_id, &ret);
if (ret < 1 ) {
ucs_warn("failed to add rndv req id (%zu) to worker %p rndv req ptrs hash",
worker->rndv_req_id, worker);
sreq->send.rndv_req_id, worker);
}

sreq->send.rndv_req_id = worker->rndv_req_id;
kh_value(&worker->rndv_req_ptrs, khiter) = (uintptr_t)sreq;

ucs_debug("added sreq %p to hash with key %zu. worker %p",
sreq, sreq->send.rndv_req_id, worker);

worker->rndv_req_id++;

return status;
}
Expand Down Expand Up @@ -713,32 +831,9 @@ ucp_rndv_req_init_zcopy_lane_map(ucp_request_t *rndv_req)
rndv_req->send.rndv_get.lanes_count = ucs_popcount(lane_map);
}

static void ucp_rndv_req_add_debug_entry(ucp_request_t *rndv_req,
const ucp_rndv_rts_hdr_t *rndv_rts_hdr)
{
ucp_worker_h worker = rndv_req->send.ep->worker;
ucp_tag_rndv_debug_entry_t *entry;
size_t elem_index;

/* set request id */
rndv_req->send.rndv_req_id = worker->rndv_req_id;
worker->rndv_req_id++;

elem_index = rndv_req->send.rndv_req_id %
worker->tm.rndv_debug.queue_length;
entry = &worker->tm.rndv_debug.queue[elem_index];
entry->id = rndv_req->send.rndv_req_id;
entry->ep = rndv_req->send.ep;
entry->send_tag = rndv_rts_hdr->super.tag;
entry->recv_tag = rndv_req->send.rndv_get.rreq->recv.tag.tag;
entry->remote_address = rndv_req->send.rndv_get.remote_address;
entry->local_address = rndv_req->send.buffer;
entry->size = rndv_req->send.length;
entry->req = rndv_req;
}

static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rreq,
const ucp_rndv_rts_hdr_t *rndv_rts_hdr)
const ucp_rndv_rts_hdr_t *rndv_rts_hdr,
uint64_t rts_seq)
{
ucp_worker_h worker = rndv_req->send.ep->worker;
ucs_status_t status;
Expand Down Expand Up @@ -769,7 +864,7 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr
ucp_rndv_req_init_zcopy_lane_map(rndv_req);

if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
ucp_rndv_req_add_debug_entry(rndv_req, rndv_rts_hdr);
ucp_rndv_get_req_add_debug_entry(rndv_req, rndv_rts_hdr, rts_seq);
}

if (worker->context->config.ext.rdnv_defer_sched) {
Expand Down Expand Up @@ -993,9 +1088,10 @@ ucp_rndv_test_zcopy_scheme_support(size_t length, size_t min_zcopy,
/* or can the message be split? */ split);
}

UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr, rts_seq),
ucp_worker_h worker, ucp_request_t *rreq,
const ucp_rndv_rts_hdr_t *rndv_rts_hdr)
const ucp_rndv_rts_hdr_t *rndv_rts_hdr,
uint64_t rts_seq)
{
ucp_rndv_mode_t rndv_mode;
ucp_request_t *rndv_req;
Expand Down Expand Up @@ -1064,7 +1160,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
ep_config->tag.rndv.max_get_zcopy,
ep_config->tag.rndv.get_zcopy_split)) {
/* try to fetch the data with a get_zcopy operation */
ucp_rndv_req_send_rma_get(rndv_req, rreq, rndv_rts_hdr);
ucp_rndv_req_send_rma_get(rndv_req, rreq, rndv_rts_hdr, rts_seq);
goto out;
} else if (rndv_mode == UCP_RNDV_MODE_AUTO) {
/* check if we need pipelined memtype staging */
Expand Down Expand Up @@ -1148,6 +1244,9 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,
ucp_recv_desc_t *rdesc;
ucp_request_t *rreq;
ucs_status_t status;
uint64_t seq;

seq = worker->rndv_rts_recv_seq++;

if (rndv_rts_hdr->status == UCS_ERR_CANCELED) {
ucp_rndv_unexp_cancel(worker, rndv_rts_hdr);
Expand All @@ -1156,7 +1255,7 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,

rreq = ucp_tag_exp_search(&worker->tm, rndv_rts_hdr->super.tag);
if (rreq != NULL) {
ucp_rndv_matched(worker, rreq, rndv_rts_hdr);
ucp_rndv_matched(worker, rreq, rndv_rts_hdr, seq);

/* Cancel req in transport if it was offloaded, because it arrived
as unexpected */
Expand All @@ -1169,6 +1268,7 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,
sizeof(*rndv_rts_hdr),
UCP_RECV_DESC_FLAG_RNDV, 0, &rdesc);
if (!UCS_STATUS_IS_ERR(status)) {
rdesc->rndv_rts_seq = seq;
ucp_tag_unexp_recv(&worker->tm, rdesc, rndv_rts_hdr->super.tag);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/tag/rndv.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *req);
void ucp_tag_rndv_cancel(ucp_request_t *sreq);

void ucp_rndv_matched(ucp_worker_h worker, ucp_request_t *req,
const ucp_rndv_rts_hdr_t *rndv_rts_hdr);
const ucp_rndv_rts_hdr_t *rndv_rts_hdr,
uint64_t rts_seq);

ucs_status_t ucp_rndv_progress_rma_get_zcopy(uct_pending_req_t *self);

Expand Down
7 changes: 6 additions & 1 deletion src/ucp/tag/tag_match.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,19 @@ KHASH_INIT(ucp_tag_frag_hash, uint64_t, ucp_tag_frag_match_t, 1,


typedef struct ucp_tag_rndv_debug_entry {
const char *type;
uint64_t id;
uint64_t rts_seq;
ucp_ep_h ep;
ucp_tag_t send_tag;
ucp_tag_t recv_tag;
uintptr_t remote_address;
uintptr_t remote_reqptr;
void *local_address;
size_t size;
ucp_request_t *req;
ucp_request_t *rndv_get_req;
ucp_request_t *send_req;
ucp_request_t *recv_req;
} ucp_tag_rndv_debug_entry_t;


Expand Down
Loading

0 comments on commit 67d1965

Please sign in to comment.