diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index c81667957bf..29e41bf721d 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -55,6 +55,15 @@ enum { }; +/** + * Receive descriptor list pointers + */ +enum { + UCP_RDESC_HASH_LIST = 0, + UCP_RDESC_ALL_LIST = 1 +}; + + /* Callback for UCP requests */ typedef void (*ucp_request_callback_t)(ucp_request_t *req); @@ -143,7 +152,7 @@ struct ucp_request { * Unexpected receive descriptor. */ typedef struct ucp_recv_desc { - ucs_queue_elem_t queue; /* Queue element */ + ucs_list_link_t list[2]; /* Hash list element */ size_t length; /* Received length */ uint16_t hdr_len; /* Header size */ uint16_t flags; /* Flags */ diff --git a/src/ucp/tag/probe.c b/src/ucp/tag/probe.c index 6f2b1e08201..3b4cabb64cf 100644 --- a/src/ucp/tag/probe.c +++ b/src/ucp/tag/probe.c @@ -19,11 +19,10 @@ ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask, { ucp_recv_desc_t *rdesc; ucp_tag_hdr_t *hdr; - ucs_queue_iter_t iter; ucp_tag_t recv_tag; unsigned flags; - ucs_queue_for_each_safe(rdesc, iter, &context->tm.unexpected, queue) { + ucs_list_for_each(rdesc, &context->tm.unexpected.all, list[UCP_RDESC_ALL_LIST]) { hdr = (void*)(rdesc + 1); recv_tag = hdr->tag; flags = rdesc->flags; @@ -44,7 +43,7 @@ ucp_tag_probe_search(ucp_context_h context, ucp_tag_t tag, uint64_t tag_mask, } if (remove) { - ucs_queue_del_iter(&context->tm.unexpected, iter); + ucp_tag_unexp_remove(rdesc); } return rdesc; } diff --git a/src/ucp/tag/tag_match.c b/src/ucp/tag/tag_match.c index 970c8b34a2e..5ff582274e0 100644 --- a/src/ucp/tag/tag_match.c +++ b/src/ucp/tag/tag_match.c @@ -15,6 +15,7 @@ ucs_status_t ucp_tag_match_init(ucp_tag_match_t *tm) tm->expected.sn = 0; ucs_queue_head_init(&tm->expected.wildcard); + ucs_list_head_init(&tm->unexpected.all); tm->expected.hash = ucs_malloc(sizeof(*tm->expected.hash) * hash_size, "ucp_tm_exp_hash"); @@ -22,27 +23,35 @@ ucs_status_t ucp_tag_match_init(ucp_tag_match_t *tm) return UCS_ERR_NO_MEMORY; } + tm->unexpected.hash = ucs_malloc(sizeof(*tm->unexpected.hash) * hash_size, + "ucp_tm_unexp_hash"); + if (tm->unexpected.hash == NULL) { + ucs_free(tm->expected.hash); + return UCS_ERR_NO_MEMORY; + } + for (bucket = 0; bucket < hash_size; ++bucket) { ucs_queue_head_init(&tm->expected.hash[bucket]); + ucs_list_head_init(&tm->unexpected.hash[bucket]); } - ucs_queue_head_init(&tm->unexpected); return UCS_OK; } void ucp_tag_match_cleanup(ucp_tag_match_t *tm) { + ucs_free(tm->unexpected.hash); ucs_free(tm->expected.hash); } int ucp_tag_unexp_is_empty(ucp_tag_match_t *tm) { - return ucs_queue_is_empty(&tm->unexpected); + return ucs_list_is_empty(&tm->unexpected.all); } void ucp_tag_exp_remove(ucp_tag_match_t *tm, ucp_request_t *req) { - ucs_queue_head_t *queue = ucp_tag_exp_get_queue(tm, req); + ucs_queue_head_t *queue = ucp_tag_exp_get_req_queue(tm, req); ucs_queue_iter_t iter; ucp_request_t *qreq; diff --git a/src/ucp/tag/tag_match.h b/src/ucp/tag/tag_match.h index b637b37f0a5..9ab901ad66f 100644 --- a/src/ucp/tag/tag_match.h +++ b/src/ucp/tag/tag_match.h @@ -33,7 +33,10 @@ typedef struct ucp_tag_match { ucs_queue_head_t *hash; /* Hash table of expected non-wild tags */ uint64_t sn; } expected; - ucs_queue_head_t unexpected; /* Unexpected received descriptors */ + struct { + ucs_list_link_t all; /* Linked list of all tags */ + ucs_list_link_t *hash; /* Hash table of unexpected tags */ + } unexpected; } ucp_tag_match_t; diff --git a/src/ucp/tag/tag_match.inl b/src/ucp/tag/tag_match.inl index 76b04a115a4..2e99839e88a 100644 --- a/src/ucp/tag/tag_match.inl +++ b/src/ucp/tag/tag_match.inl @@ -70,20 +70,32 @@ ucp_tag_exp_get_queue_for_tag(ucp_tag_match_t *tm, ucp_tag_t tag) } static UCS_F_ALWAYS_INLINE ucs_queue_head_t* -ucp_tag_exp_get_queue(ucp_tag_match_t *tm, ucp_request_t *req) +ucp_tag_exp_get_queue(ucp_tag_match_t *tm, ucp_tag_t tag, ucp_tag_t tag_mask) { - if (req->recv.tag_mask == UCP_TAG_MASK_FULL) { - return ucp_tag_exp_get_queue_for_tag(tm, req->recv.tag); + if (tag_mask == UCP_TAG_MASK_FULL) { + return ucp_tag_exp_get_queue_for_tag(tm, tag); } else { return &tm->expected.wildcard; } } -static UCS_F_ALWAYS_INLINE -void ucp_tag_exp_add(ucp_tag_match_t *tm, ucp_request_t *req) +static UCS_F_ALWAYS_INLINE ucs_queue_head_t* +ucp_tag_exp_get_req_queue(ucp_tag_match_t *tm, ucp_request_t *req) +{ + return ucp_tag_exp_get_queue(tm, req->recv.tag, req->recv.tag_mask); +} + +static UCS_F_ALWAYS_INLINE void +ucp_tag_exp_push(ucp_tag_match_t *tm, ucs_queue_head_t *queue, ucp_request_t *req) { req->recv.sn = tm->expected.sn++; - ucs_queue_push(ucp_tag_exp_get_queue(tm, req), &req->recv.queue); + ucs_queue_push(queue, &req->recv.queue); +} + +static UCS_F_ALWAYS_INLINE void +ucp_tag_exp_add(ucp_tag_match_t *tm, ucp_request_t *req) +{ + ucp_tag_exp_push(tm, ucp_tag_exp_get_req_queue(tm, req), req); } static UCS_F_ALWAYS_INLINE ucp_request_t * @@ -125,12 +137,26 @@ static UCS_F_ALWAYS_INLINE ucp_tag_t ucp_rdesc_get_tag(ucp_recv_desc_t *rdesc) return ((ucp_tag_hdr_t*)(rdesc + 1))->tag; } +static UCS_F_ALWAYS_INLINE ucs_list_link_t* +ucp_tag_unexp_get_list_for_tag(ucp_tag_match_t *tm, ucp_tag_t tag) +{ + return &tm->unexpected.hash[ucp_tag_match_calc_hash(tag)]; +} + +static UCS_F_ALWAYS_INLINE void +ucp_tag_unexp_remove(ucp_recv_desc_t *rdesc) +{ + ucs_list_del(&rdesc->list[UCP_RDESC_HASH_LIST]); + ucs_list_del(&rdesc->list[UCP_RDESC_ALL_LIST] ); +} + static UCS_F_ALWAYS_INLINE ucs_status_t ucp_tag_unexp_recv(ucp_tag_match_t *tm, ucp_worker_h worker, void *data, size_t length, unsigned am_flags, uint16_t hdr_len, uint16_t flags) { ucp_recv_desc_t *rdesc = (ucp_recv_desc_t *)data - 1; + ucs_list_link_t *hash_list; ucs_status_t status; if (ucs_unlikely(am_flags & UCT_CB_FLAG_DESC)) { @@ -159,7 +185,9 @@ ucp_tag_unexp_recv(ucp_tag_match_t *tm, ucp_worker_h worker, void *data, rdesc->length = length; rdesc->hdr_len = hdr_len; - ucs_queue_push(&tm->unexpected, &rdesc->queue); + hash_list = ucp_tag_unexp_get_list_for_tag(tm, ucp_rdesc_get_tag(rdesc)); + ucs_list_add_tail(hash_list, &rdesc->list[UCP_RDESC_HASH_LIST]); + ucs_list_add_tail(&tm->unexpected.all, &rdesc->list[UCP_RDESC_ALL_LIST]); return status; } diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 94f2f96d759..2f2d22ea726 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -14,6 +14,12 @@ #include +static UCS_F_ALWAYS_INLINE ucp_recv_desc_t* +ucp_tag_unexp_list_next(ucp_recv_desc_t *rdesc, int i_list) +{ + return ucs_list_next(&rdesc->list[i_list], ucp_recv_desc_t, list[i_list]); +} + static UCS_F_ALWAYS_INLINE ucs_status_t ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_datatype_t datatype, ucp_tag_t tag, uint64_t tag_mask, @@ -21,13 +27,32 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_tag_recv_callback_t cb, unsigned *save_rreq) { ucp_context_h context = worker->context; - ucp_recv_desc_t *rdesc; - ucs_queue_iter_t iter; + ucp_recv_desc_t *rdesc, *next; + ucs_list_link_t *list; ucs_status_t status; ucp_tag_t recv_tag; unsigned flags; + int i_list; - ucs_queue_for_each_safe(rdesc, iter, &context->tm.unexpected, queue) { + /* fast check of global unexpected queue */ + if (ucs_list_is_empty(&context->tm.unexpected.all)) { + return UCS_INPROGRESS; + } + + if (tag_mask == UCP_TAG_MASK_FULL) { + list = ucp_tag_unexp_get_list_for_tag(&context->tm, tag); + if (ucs_list_is_empty(list)) { + return UCS_INPROGRESS; + } + + i_list = UCP_RDESC_HASH_LIST; + } else { + list = &context->tm.unexpected.all; + i_list = UCP_RDESC_ALL_LIST; + } + + rdesc = ucs_list_head(list, ucp_recv_desc_t, list[i_list]); + do { recv_tag = ucp_rdesc_get_tag(rdesc); flags = rdesc->flags; ucs_trace_req("searching for %"PRIx64"/%"PRIx64"/%"PRIx64" offset %zu, " @@ -44,32 +69,41 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, { ucp_tag_log_match(recv_tag, rdesc->length - rdesc->hdr_len, req, tag, tag_mask, req->recv.state.offset, "unexpected"); - ucs_queue_del_iter(&context->tm.unexpected, iter); + ucp_tag_unexp_remove(rdesc); if (rdesc->flags & UCP_RECV_DESC_FLAG_EAGER) { UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0); status = ucp_eager_unexp_match(worker, rdesc, recv_tag, flags, buffer, buffer_size, datatype, &req->recv.state, info); ucs_trace_req("release receive descriptor %p", rdesc); - ucp_tag_unexp_desc_release(rdesc); if (status != UCS_INPROGRESS) { - return status; + goto out_release_desc; } - } else if (rdesc->flags & UCP_RECV_DESC_FLAG_RNDV) { + + next = ucp_tag_unexp_list_next(rdesc, i_list); + ucp_tag_unexp_desc_release(rdesc); + rdesc = next; + } else { + ucs_assert_always(rdesc->flags & UCP_RECV_DESC_FLAG_RNDV); *save_rreq = 0; req->recv.buffer = buffer; req->recv.length = buffer_size; req->recv.datatype = datatype; req->recv.cb = cb; ucp_rndv_matched(worker, req, (void*)(rdesc + 1)); - ucp_tag_unexp_desc_release(rdesc); UCP_WORKER_STAT_RNDV(worker, UNEXP); - return UCS_INPROGRESS; + status = UCS_INPROGRESS; + goto out_release_desc; } + } else { + rdesc = ucp_tag_unexp_list_next(rdesc, i_list); } - } - + } while (&rdesc->list[i_list] != list); return UCS_INPROGRESS; + +out_release_desc: + ucp_tag_unexp_desc_release(rdesc); + return status; } @@ -143,8 +177,10 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t buffer_size, ucp_request_t *req, ucp_tag_recv_callback_t cb, const char *debug_name) { - ucs_status_t status; unsigned save_rreq = 1; + ucs_queue_head_t *queue; + ucp_context_h context; + ucs_status_t status; ucs_trace_req("%s buffer %p buffer_size %zu tag %"PRIx64"/%"PRIx64, debug_name, buffer, buffer_size, tag, tag_mask); @@ -157,15 +193,17 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t buffer_size, } else if (save_rreq) { /* If not found on unexpected, wait until it arrives. * If was found but need this receive request for later completion, save it */ + context = worker->context; + queue = ucp_tag_exp_get_queue(&context->tm, tag, tag_mask); req->recv.buffer = buffer; req->recv.length = buffer_size; req->recv.datatype = datatype; req->recv.tag = tag; req->recv.tag_mask = tag_mask; req->recv.cb = cb; - ucp_tag_exp_add(&worker->context->tm, req); - ucs_trace_req("%s returning expected request %p (%p)", debug_name, - req, req + 1); + ucp_tag_exp_push(&context->tm, queue, req); + ucs_trace_req("%s returning expected request %p (%p)", debug_name, req, + req + 1); } return status; @@ -294,8 +332,9 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_msg_recv_nb, * to receive additional fragments. */ if (status == UCS_INPROGRESS) { - status = ucp_tag_search_unexp(worker, buffer, buffer_size, datatype, 0, - -1, req, &req->recv.info, cb, &save_rreq); + status = ucp_tag_search_unexp(worker, buffer, buffer_size, datatype, + req->recv.info.sender_tag, UCP_TAG_MASK_FULL, + req, &req->recv.info, cb, &save_rreq); } if (status != UCS_INPROGRESS) { diff --git a/test/gtest/ucp/test_ucp_tag_perf.cc b/test/gtest/ucp/test_ucp_tag_perf.cc index eb316b8a716..6ee5bd39bc5 100644 --- a/test/gtest/ucp/test_ucp_tag_perf.cc +++ b/test/gtest/ucp/test_ucp_tag_perf.cc @@ -23,7 +23,7 @@ class test_ucp_tag_perf : public test_ucp_tag { static const ucp_tag_t TAG_MASK = 0xffffffffffffffffUL; double check_perf(size_t count, bool is_exp); - void check_scalability(double max_growth, bool is_exp); + void check_scalability(double max_growth, bool is_exp, int retries); void do_sends(size_t count); }; @@ -73,42 +73,51 @@ void test_ucp_tag_perf::do_sends(size_t count) } } -void test_ucp_tag_perf::check_scalability(double max_growth, bool is_exp) +void test_ucp_tag_perf::check_scalability(double max_growth, bool is_exp, + int retries) { - double prev_time = 0.0, total_growth = 0.0; + double prev_time = 0.0, total_growth = 0.0, avg_growth; size_t n = 0; - - /* Estimate by how much the tag matching time grows when the matching queue - * length grows by 2x. A result close to 1.0 means O(1) scalability (which - * is good), while a result of 2.0 or higher means O(n) or higher. - */ - for (size_t count = 1; count <= COUNT; count *= 2) { - size_t iters = ucs_max(1ul, COUNT / count); - double total_time = 0; - for (size_t i = 0; i < iters; ++i) { - total_time += check_perf(count, is_exp); + int attempt; + + attempt = 0; + do { + ++attempt; + + /* Estimate by how much the tag matching time grows when the matching queue + * length grows by 2x. A result close to 1.0 means O(1) scalability (which + * is good), while a result of 2.0 or higher means O(n) or higher. + */ + for (size_t count = 1; count <= COUNT; count *= 2) { + size_t iters = 10 * ucs_max(1ul, COUNT / count); + double total_time = 0; + for (size_t i = 0; i < iters; ++i) { + total_time += check_perf(count, is_exp); + } + + double time = total_time / iters; + if (count >= 16) { + /* don't measure first few iterations - warmup */ + total_growth += (time / prev_time); + ++n; + } + prev_time = time; } - double time = total_time / iters; - if (count >= 16) { - /* don't measure first few iterations - warmup */ - total_growth += (time / prev_time); - ++n; - } - prev_time = time; - } + avg_growth = total_growth / n; + UCS_TEST_MESSAGE << "Average growth: " << avg_growth << + " (" << attempt << "/" << retries << ")"; + } while ((avg_growth >= max_growth) && (attempt < retries)); - double avg_growth = total_growth / n; - UCS_TEST_MESSAGE << "Average growth: " << avg_growth; EXPECT_LT(avg_growth, max_growth) << "Tag matching is not scalable"; } UCS_TEST_P(test_ucp_tag_perf, multi_exp) { - check_scalability(1.3, true); + check_scalability(1.5, true, 5); } UCS_TEST_P(test_ucp_tag_perf, multi_unexp) { - check_scalability(10.0, false); /* unexpected is not scalable yet*/ + check_scalability(1.5, false, 5); } UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_perf)