Skip to content

Commit

Permalink
Merge pull request openucx#6665 from karasevb/1.10.x/ucp_old_am_fix
Browse files Browse the repository at this point in the history
UCP/AM: force eager protocol for old AM API - v1.10.x
  • Loading branch information
yosefe authored Apr 20, 2021
2 parents cd50625 + 0455f2f commit 590848b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
* Fix in a release flow of deferred data
* Fixes for invalid ID and handling of status in RNDV
* Fixes in short active message reply protocol
* Fix backward compatibility of Active Message API semantics
#### CUDA
* Fixes in managed memory support
* Fixes in topology detection
Expand Down
13 changes: 10 additions & 3 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ ucs_status_ptr_t ucp_am_send_nb(ucp_ep_h ep, uint16_t id, const void *payload,
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_FLAGS,
.flags = flags,
.flags = flags | UCP_AM_SEND_FLAG_EAGER,
.cb.send = (ucp_send_nbx_callback_t)cb,
.datatype = datatype
};
Expand Down Expand Up @@ -1419,12 +1419,20 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
ucp_worker_h worker = arg;
uint16_t am_id = rts->am.am_id;
ucp_recv_desc_t *desc = NULL;
ucp_am_entry_t *am_cb = &ucs_array_elem(&worker->am, am_id);
ucp_ep_h ep;
ucp_am_entry_t *am_cb;
ucp_am_recv_param_t param;
ucs_status_t status, desc_status;
void *hdr;

if (ENABLE_PARAMS_CHECK && !(am_cb->flags & UCP_AM_CB_PRIV_FLAG_NBX)) {
ucs_error("active message callback registered with "
"ucp_worker_set_am_handler() API does not support rendezvous "
"protocol, the sender side should use ucp_am_send_nbx() API");
status = UCS_ERR_INVALID_PARAM;
goto out_send_ats;
}

ep = UCP_WORKER_GET_VALID_EP_BY_ID(worker, rts->super.sreq.ep_id,
{ status = UCS_ERR_CANCELED;
goto out_send_ats;
Expand Down Expand Up @@ -1452,7 +1460,6 @@ ucs_status_t ucp_am_rndv_process_rts(void *arg, void *data, size_t length,
goto out_send_ats;
}

am_cb = &ucs_array_elem(&worker->am, am_id);
param.recv_attr = UCP_AM_RECV_ATTR_FLAG_RNDV |
ucp_am_hdr_reply_ep(worker, rts->am.flags, ep,
&param.reply_ep);
Expand Down
17 changes: 13 additions & 4 deletions test/gtest/ucp/test_ucp_am.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void test_ucp_am::do_set_am_handler_realloc_test()
do_send_process_data_test(0, UCP_SEND_ID + 1, 0);
}

UCS_TEST_P(test_ucp_am, send_process_am, "RNDV_THRESH=-1")
UCS_TEST_P(test_ucp_am, send_process_am)
{
set_handlers(UCP_SEND_ID);
do_send_process_data_test(0, UCP_SEND_ID, 0);
Expand All @@ -263,13 +263,22 @@ UCS_TEST_P(test_ucp_am, send_process_am, "RNDV_THRESH=-1")
do_send_process_data_test(0, UCP_SEND_ID, UCP_AM_SEND_REPLY);
}

UCS_TEST_P(test_ucp_am, send_process_am_release, "RNDV_THRESH=-1")
UCS_TEST_P(test_ucp_am, send_process_am_rndv, "RNDV_THRESH=1")
{
set_handlers(UCP_SEND_ID);
do_send_process_data_test(0, UCP_SEND_ID, 0);

set_reply_handlers();
do_send_process_data_test(0, UCP_SEND_ID, UCP_AM_SEND_REPLY);
}

UCS_TEST_P(test_ucp_am, send_process_am_release)
{
set_handlers(UCP_SEND_ID);
do_send_process_data_test(UCP_RELEASE, 0, 0);
}

UCS_TEST_P(test_ucp_am, send_process_iov_am, "RNDV_THRESH=-1")
UCS_TEST_P(test_ucp_am, send_process_iov_am)
{
ucs::detail::message_stream ms("INFO");

Expand All @@ -285,7 +294,7 @@ UCS_TEST_P(test_ucp_am, send_process_iov_am, "RNDV_THRESH=-1")
}
}

UCS_TEST_P(test_ucp_am, set_am_handler_realloc, "RNDV_THRESH=-1")
UCS_TEST_P(test_ucp_am, set_am_handler_realloc)
{
do_set_am_handler_realloc_test();
}
Expand Down

0 comments on commit 590848b

Please sign in to comment.