Skip to content

Commit

Permalink
Merge pull request openucx#1520 from yosefe/topic/ucp-tag-unexp-hash
Browse files Browse the repository at this point in the history
UCP/TAG: Use hash table for matching unexpected receive descriptors.
  • Loading branch information
yosefe authored May 19, 2017
2 parents 3d479e2 + fc5744c commit 1c650ec
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 57 deletions.
11 changes: 10 additions & 1 deletion src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ enum {
};


/**
* Receive descriptor list pointers
*/
enum {
UCP_RDESC_HASH_LIST = 0,
UCP_RDESC_ALL_LIST = 1
};


/* Callback for UCP requests */
typedef void (*ucp_request_callback_t)(ucp_request_t *req);

Expand Down Expand Up @@ -143,7 +152,7 @@ struct ucp_request {
* Unexpected receive descriptor.
*/
typedef struct ucp_recv_desc {
ucs_queue_elem_t queue; /* Queue element */
ucs_list_link_t list[2]; /* Hash list element */
size_t length; /* Received length */
uint16_t hdr_len; /* Header size */
uint16_t flags; /* Flags */
Expand Down
5 changes: 2 additions & 3 deletions src/ucp/tag/probe.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask,
{
ucp_recv_desc_t *rdesc;
ucp_tag_hdr_t *hdr;
ucs_queue_iter_t iter;
ucp_tag_t recv_tag;
unsigned flags;

ucs_queue_for_each_safe(rdesc, iter, &context->tm.unexpected, queue) {
ucs_list_for_each(rdesc, &context->tm.unexpected.all, list[UCP_RDESC_ALL_LIST]) {
hdr = (void*)(rdesc + 1);
recv_tag = hdr->tag;
flags = rdesc->flags;
Expand All @@ -44,7 +43,7 @@ ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask,
}

if (remove) {
ucs_queue_del_iter(&context->tm.unexpected, iter);
ucp_tag_unexp_remove(rdesc);
}
return rdesc;
}
Expand Down
15 changes: 12 additions & 3 deletions src/ucp/tag/tag_match.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,43 @@ ucs_status_t ucp_tag_match_init(ucp_tag_match_t *tm)

tm->expected.sn = 0;
ucs_queue_head_init(&tm->expected.wildcard);
ucs_list_head_init(&tm->unexpected.all);

tm->expected.hash = ucs_malloc(sizeof(*tm->expected.hash) * hash_size,
"ucp_tm_exp_hash");
if (tm->expected.hash == NULL) {
return UCS_ERR_NO_MEMORY;
}

tm->unexpected.hash = ucs_malloc(sizeof(*tm->unexpected.hash) * hash_size,
"ucp_tm_unexp_hash");
if (tm->unexpected.hash == NULL) {
ucs_free(tm->expected.hash);
return UCS_ERR_NO_MEMORY;
}

for (bucket = 0; bucket < hash_size; ++bucket) {
ucs_queue_head_init(&tm->expected.hash[bucket]);
ucs_list_head_init(&tm->unexpected.hash[bucket]);
}

ucs_queue_head_init(&tm->unexpected);
return UCS_OK;
}

void ucp_tag_match_cleanup(ucp_tag_match_t *tm)
{
ucs_free(tm->unexpected.hash);
ucs_free(tm->expected.hash);
}

int ucp_tag_unexp_is_empty(ucp_tag_match_t *tm)
{
return ucs_queue_is_empty(&tm->unexpected);
return ucs_list_is_empty(&tm->unexpected.all);
}

void ucp_tag_exp_remove(ucp_tag_match_t *tm, ucp_request_t *req)
{
ucs_queue_head_t *queue = ucp_tag_exp_get_queue(tm, req);
ucs_queue_head_t *queue = ucp_tag_exp_get_req_queue(tm, req);
ucs_queue_iter_t iter;
ucp_request_t *qreq;

Expand Down
5 changes: 4 additions & 1 deletion src/ucp/tag/tag_match.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ typedef struct ucp_tag_match {
ucs_queue_head_t *hash; /* Hash table of expected non-wild tags */
uint64_t sn;
} expected;
ucs_queue_head_t unexpected; /* Unexpected received descriptors */
struct {
ucs_list_link_t all; /* Linked list of all tags */
ucs_list_link_t *hash; /* Hash table of unexpected tags */
} unexpected;
} ucp_tag_match_t;


Expand Down
42 changes: 35 additions & 7 deletions src/ucp/tag/tag_match.inl
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,32 @@ ucp_tag_exp_get_queue_for_tag(ucp_tag_match_t *tm, ucp_tag_t tag)
}

static UCS_F_ALWAYS_INLINE ucs_queue_head_t*
ucp_tag_exp_get_queue(ucp_tag_match_t *tm, ucp_request_t *req)
ucp_tag_exp_get_queue(ucp_tag_match_t *tm, ucp_tag_t tag, ucp_tag_t tag_mask)
{
if (req->recv.tag_mask == UCP_TAG_MASK_FULL) {
return ucp_tag_exp_get_queue_for_tag(tm, req->recv.tag);
if (tag_mask == UCP_TAG_MASK_FULL) {
return ucp_tag_exp_get_queue_for_tag(tm, tag);
} else {
return &tm->expected.wildcard;
}
}

static UCS_F_ALWAYS_INLINE
void ucp_tag_exp_add(ucp_tag_match_t *tm, ucp_request_t *req)
static UCS_F_ALWAYS_INLINE ucs_queue_head_t*
ucp_tag_exp_get_req_queue(ucp_tag_match_t *tm, ucp_request_t *req)
{
return ucp_tag_exp_get_queue(tm, req->recv.tag, req->recv.tag_mask);
}

static UCS_F_ALWAYS_INLINE void
ucp_tag_exp_push(ucp_tag_match_t *tm, ucs_queue_head_t *queue, ucp_request_t *req)
{
req->recv.sn = tm->expected.sn++;
ucs_queue_push(ucp_tag_exp_get_queue(tm, req), &req->recv.queue);
ucs_queue_push(queue, &req->recv.queue);
}

static UCS_F_ALWAYS_INLINE void
ucp_tag_exp_add(ucp_tag_match_t *tm, ucp_request_t *req)
{
ucp_tag_exp_push(tm, ucp_tag_exp_get_req_queue(tm, req), req);
}

static UCS_F_ALWAYS_INLINE ucp_request_t *
Expand Down Expand Up @@ -125,12 +137,26 @@ static UCS_F_ALWAYS_INLINE ucp_tag_t ucp_rdesc_get_tag(ucp_recv_desc_t *rdesc)
return ((ucp_tag_hdr_t*)(rdesc + 1))->tag;
}

static UCS_F_ALWAYS_INLINE ucs_list_link_t*
ucp_tag_unexp_get_list_for_tag(ucp_tag_match_t *tm, ucp_tag_t tag)
{
return &tm->unexpected.hash[ucp_tag_match_calc_hash(tag)];
}

static UCS_F_ALWAYS_INLINE void
ucp_tag_unexp_remove(ucp_recv_desc_t *rdesc)
{
ucs_list_del(&rdesc->list[UCP_RDESC_HASH_LIST]);
ucs_list_del(&rdesc->list[UCP_RDESC_ALL_LIST] );
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_tag_unexp_recv(ucp_tag_match_t *tm, ucp_worker_h worker, void *data,
size_t length, unsigned am_flags, uint16_t hdr_len,
uint16_t flags)
{
ucp_recv_desc_t *rdesc = (ucp_recv_desc_t *)data - 1;
ucs_list_link_t *hash_list;
ucs_status_t status;

if (ucs_unlikely(am_flags & UCT_CB_FLAG_DESC)) {
Expand Down Expand Up @@ -159,7 +185,9 @@ ucp_tag_unexp_recv(ucp_tag_match_t *tm, ucp_worker_h worker, void *data,

rdesc->length = length;
rdesc->hdr_len = hdr_len;
ucs_queue_push(&tm->unexpected, &rdesc->queue);
hash_list = ucp_tag_unexp_get_list_for_tag(tm, ucp_rdesc_get_tag(rdesc));
ucs_list_add_tail(hash_list, &rdesc->list[UCP_RDESC_HASH_LIST]);
ucs_list_add_tail(&tm->unexpected.all, &rdesc->list[UCP_RDESC_ALL_LIST]);
return status;
}

Expand Down
73 changes: 56 additions & 17 deletions src/ucp/tag/tag_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,45 @@
#include <ucs/datastruct/queue.h>


static UCS_F_ALWAYS_INLINE ucp_recv_desc_t*
ucp_tag_unexp_list_next(ucp_recv_desc_t *rdesc, int i_list)
{
return ucs_list_next(&rdesc->list[i_list], ucp_recv_desc_t, list[i_list]);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size,
ucp_datatype_t datatype, ucp_tag_t tag, uint64_t tag_mask,
ucp_request_t *req, ucp_tag_recv_info_t *info,
ucp_tag_recv_callback_t cb, unsigned *save_rreq)
{
ucp_context_h context = worker->context;
ucp_recv_desc_t *rdesc;
ucs_queue_iter_t iter;
ucp_recv_desc_t *rdesc, *next;
ucs_list_link_t *list;
ucs_status_t status;
ucp_tag_t recv_tag;
unsigned flags;
int i_list;

ucs_queue_for_each_safe(rdesc, iter, &context->tm.unexpected, queue) {
/* fast check of global unexpected queue */
if (ucs_list_is_empty(&context->tm.unexpected.all)) {
return UCS_INPROGRESS;
}

if (tag_mask == UCP_TAG_MASK_FULL) {
list = ucp_tag_unexp_get_list_for_tag(&context->tm, tag);
if (ucs_list_is_empty(list)) {
return UCS_INPROGRESS;
}

i_list = UCP_RDESC_HASH_LIST;
} else {
list = &context->tm.unexpected.all;
i_list = UCP_RDESC_ALL_LIST;
}

rdesc = ucs_list_head(list, ucp_recv_desc_t, list[i_list]);
do {
recv_tag = ucp_rdesc_get_tag(rdesc);
flags = rdesc->flags;
ucs_trace_req("searching for %"PRIx64"/%"PRIx64"/%"PRIx64" offset %zu, "
Expand All @@ -44,32 +69,41 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size,
{
ucp_tag_log_match(recv_tag, rdesc->length - rdesc->hdr_len, req, tag,
tag_mask, req->recv.state.offset, "unexpected");
ucs_queue_del_iter(&context->tm.unexpected, iter);
ucp_tag_unexp_remove(rdesc);
if (rdesc->flags & UCP_RECV_DESC_FLAG_EAGER) {
UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0);
status = ucp_eager_unexp_match(worker, rdesc, recv_tag, flags,
buffer, buffer_size, datatype,
&req->recv.state, info);
ucs_trace_req("release receive descriptor %p", rdesc);
ucp_tag_unexp_desc_release(rdesc);
if (status != UCS_INPROGRESS) {
return status;
goto out_release_desc;
}
} else if (rdesc->flags & UCP_RECV_DESC_FLAG_RNDV) {

next = ucp_tag_unexp_list_next(rdesc, i_list);
ucp_tag_unexp_desc_release(rdesc);
rdesc = next;
} else {
ucs_assert_always(rdesc->flags & UCP_RECV_DESC_FLAG_RNDV);
*save_rreq = 0;
req->recv.buffer = buffer;
req->recv.length = buffer_size;
req->recv.datatype = datatype;
req->recv.cb = cb;
ucp_rndv_matched(worker, req, (void*)(rdesc + 1));
ucp_tag_unexp_desc_release(rdesc);
UCP_WORKER_STAT_RNDV(worker, UNEXP);
return UCS_INPROGRESS;
status = UCS_INPROGRESS;
goto out_release_desc;
}
} else {
rdesc = ucp_tag_unexp_list_next(rdesc, i_list);
}
}

} while (&rdesc->list[i_list] != list);
return UCS_INPROGRESS;

out_release_desc:
ucp_tag_unexp_desc_release(rdesc);
return status;
}


Expand Down Expand Up @@ -143,8 +177,10 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t buffer_size,
ucp_request_t *req, ucp_tag_recv_callback_t cb,
const char *debug_name)
{
ucs_status_t status;
unsigned save_rreq = 1;
ucs_queue_head_t *queue;
ucp_context_h context;
ucs_status_t status;

ucs_trace_req("%s buffer %p buffer_size %zu tag %"PRIx64"/%"PRIx64, debug_name,
buffer, buffer_size, tag, tag_mask);
Expand All @@ -157,15 +193,17 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t buffer_size,
} else if (save_rreq) {
/* If not found on unexpected, wait until it arrives.
* If was found but need this receive request for later completion, save it */
context = worker->context;
queue = ucp_tag_exp_get_queue(&context->tm, tag, tag_mask);
req->recv.buffer = buffer;
req->recv.length = buffer_size;
req->recv.datatype = datatype;
req->recv.tag = tag;
req->recv.tag_mask = tag_mask;
req->recv.cb = cb;
ucp_tag_exp_add(&worker->context->tm, req);
ucs_trace_req("%s returning expected request %p (%p)", debug_name,
req, req + 1);
ucp_tag_exp_push(&context->tm, queue, req);
ucs_trace_req("%s returning expected request %p (%p)", debug_name, req,
req + 1);
}

return status;
Expand Down Expand Up @@ -294,8 +332,9 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_msg_recv_nb,
* to receive additional fragments.
*/
if (status == UCS_INPROGRESS) {
status = ucp_tag_search_unexp(worker, buffer, buffer_size, datatype, 0,
-1, req, &req->recv.info, cb, &save_rreq);
status = ucp_tag_search_unexp(worker, buffer, buffer_size, datatype,
req->recv.info.sender_tag, UCP_TAG_MASK_FULL,
req, &req->recv.info, cb, &save_rreq);
}

if (status != UCS_INPROGRESS) {
Expand Down
Loading

0 comments on commit 1c650ec

Please sign in to comment.