From f3596f4119e3664e1055c0a07e5d2348f824903b Mon Sep 17 00:00:00 2001 From: Sergey Lebedev Date: Tue, 28 May 2019 14:07:58 +0300 Subject: [PATCH] TEST/EXAMPLES: ucp_stream_recv_nb can return UCS_OK --- test/examples/ucp_client_server.c | 52 ++++++++++++++++++------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/test/examples/ucp_client_server.c b/test/examples/ucp_client_server.c index 0ce661d6955..05fb0846172 100644 --- a/test/examples/ucp_client_server.c +++ b/test/examples/ucp_client_server.c @@ -202,6 +202,7 @@ static int start_client(ucp_worker_h ucp_worker, const char *ip, static void print_result(int is_server, char *recv_message) { if (is_server) { + printf("UCX data message was received\n"); printf("\n\n----- UCP TEST SUCCESS -------\n\n"); printf("%s", recv_message); printf("\n\n------------------------------\n\n"); @@ -213,15 +214,32 @@ static void print_result(int is_server, char *recv_message) } } -static void request_wait(ucp_worker_h ucp_worker, test_req_t *request) +/** + * Progress the request until it completes. + */ +static ucs_status_t request_wait(ucp_worker_h ucp_worker, test_req_t *request) { + ucs_status_t status; + + /* if operation was completed immediately */ + if (request == NULL) { + return UCS_OK; + } + + if (UCS_PTR_IS_ERR(request)) { + return UCS_PTR_STATUS(request); + } + while (request->complete == 0) { ucp_worker_progress(ucp_worker); } + status = ucp_request_check_status(request); /* This request may be reused so initialize it for next time */ request->complete = 0; ucp_request_free(request); + + return status; } /** @@ -235,39 +253,31 @@ static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server) test_req_t *request; size_t length; int ret = 0; + ucs_status_t status; if (!is_server) { /* Client sends a message to the server using the stream API */ request = ucp_stream_send_nb(ep, test_message, 1, ucp_dt_make_contig(TEST_STRING_LEN), stream_send_cb, 0); - if (UCS_PTR_IS_ERR(request)) { - fprintf(stderr, "unable to send UCX message (%s)\n", - ucs_status_string(UCS_PTR_STATUS(request))); - ret = -1; - goto out; - } else if (UCS_PTR_STATUS(request) != UCS_OK) { - request_wait(ucp_worker, request); - } } else { /* Server receives a message from the client using the stream API */ request = ucp_stream_recv_nb(ep, &recv_message, 1, ucp_dt_make_contig(TEST_STRING_LEN), - stream_recv_cb, &length , 0); - if (UCS_PTR_IS_ERR(request)) { - fprintf(stderr, "unable to receive UCX message (%s)\n", - ucs_status_string(UCS_PTR_STATUS(request))); - ret = -1; - goto out; - } else { - request_wait(ucp_worker, request); - printf("UCX data message was received\n"); - } + stream_recv_cb, &length, + UCP_STREAM_RECV_FLAG_WAITALL); } - print_result(is_server, recv_message); + status = request_wait(ucp_worker, request); + if (status != UCS_OK){ + fprintf(stderr, "unable to %s UCX message (%s)\n", + is_server ? "receive": "send", + ucs_status_string(status)); + ret = -1; + } else { + print_result(is_server, recv_message); + } -out: return ret; }