From bc408b3367c6693531df88a2e3779a8a5bed1579 Mon Sep 17 00:00:00 2001 From: Boris Karasev Date: Mon, 12 Apr 2021 12:42:58 +0300 Subject: [PATCH] UCP/AM: force eager protocol for old AM API --- src/ucp/core/ucp_am.c | 13 ++++++++++--- test/gtest/ucp/test_ucp_am.cc | 17 +++++++++++++---- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/ucp/core/ucp_am.c b/src/ucp/core/ucp_am.c index 0c509be73ac..323ac4abcf2 100644 --- a/src/ucp/core/ucp_am.c +++ b/src/ucp/core/ucp_am.c @@ -1000,7 +1000,7 @@ ucs_status_ptr_t ucp_am_send_nb(ucp_ep_h ep, uint16_t id, const void *payload, .op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE | UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_FLAGS, - .flags = flags, + .flags = flags | UCP_AM_SEND_FLAG_EAGER, .cb.send = (ucp_send_nbx_callback_t)cb, .datatype = datatype }; @@ -1486,12 +1486,20 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length, ucp_worker_h worker = arg; uint16_t am_id = rts->am.am_id; ucp_recv_desc_t *desc = NULL; + ucp_am_entry_t *am_cb = &ucs_array_elem(&worker->am, am_id); ucp_ep_h ep; - ucp_am_entry_t *am_cb; ucp_am_recv_param_t param; ucs_status_t status, desc_status; void *hdr; + if (ENABLE_PARAMS_CHECK && !(am_cb->flags & UCP_AM_CB_PRIV_FLAG_NBX)) { + ucs_error("active message callback registered with " + "ucp_worker_set_am_handler() API does not support rendezvous " + "protocol, the sender side should use ucp_am_send_nbx() API"); + status = UCS_ERR_INVALID_PARAM; + goto out_send_ats; + } + UCP_WORKER_GET_VALID_EP_BY_ID(&ep, worker, rts->super.sreq.ep_id, { status = UCS_ERR_CANCELED; goto out_send_ats; }, @@ -1520,7 +1528,6 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length, goto out_send_ats; } - am_cb = &ucs_array_elem(&worker->am, am_id); param.recv_attr = UCP_AM_RECV_ATTR_FLAG_RNDV | ucp_am_hdr_reply_ep(worker, rts->am.flags, ep, ¶m.reply_ep); diff --git a/test/gtest/ucp/test_ucp_am.cc b/test/gtest/ucp/test_ucp_am.cc index 8bfbdb571ea..ba8e83b6e19 100644 --- a/test/gtest/ucp/test_ucp_am.cc +++ b/test/gtest/ucp/test_ucp_am.cc @@ -255,7 +255,7 @@ void test_ucp_am::do_set_am_handler_realloc_test() do_send_process_data_test(0, UCP_SEND_ID + 1, 0); } -UCS_TEST_P(test_ucp_am, send_process_am, "RNDV_THRESH=-1") +UCS_TEST_P(test_ucp_am, send_process_am) { set_handlers(UCP_SEND_ID); do_send_process_data_test(0, UCP_SEND_ID, 0); @@ -264,13 +264,22 @@ UCS_TEST_P(test_ucp_am, send_process_am, "RNDV_THRESH=-1") do_send_process_data_test(0, UCP_SEND_ID, UCP_AM_SEND_REPLY); } -UCS_TEST_P(test_ucp_am, send_process_am_release, "RNDV_THRESH=-1") +UCS_TEST_P(test_ucp_am, send_process_am_rndv, "RNDV_THRESH=1") +{ + set_handlers(UCP_SEND_ID); + do_send_process_data_test(0, UCP_SEND_ID, 0); + + set_reply_handlers(); + do_send_process_data_test(0, UCP_SEND_ID, UCP_AM_SEND_REPLY); +} + +UCS_TEST_P(test_ucp_am, send_process_am_release) { set_handlers(UCP_SEND_ID); do_send_process_data_test(UCP_RELEASE, 0, 0); } -UCS_TEST_P(test_ucp_am, send_process_iov_am, "RNDV_THRESH=-1") +UCS_TEST_P(test_ucp_am, send_process_iov_am) { ucs::detail::message_stream ms("INFO"); @@ -286,7 +295,7 @@ UCS_TEST_P(test_ucp_am, send_process_iov_am, "RNDV_THRESH=-1") } } -UCS_TEST_P(test_ucp_am, set_am_handler_realloc, "RNDV_THRESH=-1") +UCS_TEST_P(test_ucp_am, set_am_handler_realloc) { do_set_am_handler_realloc_test(); }