Skip to content

Commit

Permalink
TEST/EXAMPLES: ucp_stream_recv_nb can return UCS_OK
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev authored and yosefe committed Jun 19, 2019
1 parent 9973a5f commit f3596f4
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions test/examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static int start_client(ucp_worker_h ucp_worker, const char *ip,
static void print_result(int is_server, char *recv_message)
{
if (is_server) {
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
printf("%s", recv_message);
printf("\n\n------------------------------\n\n");
Expand All @@ -213,15 +214,32 @@ static void print_result(int is_server, char *recv_message)
}
}

static void request_wait(ucp_worker_h ucp_worker, test_req_t *request)
/**
* Progress the request until it completes.
*/
static ucs_status_t request_wait(ucp_worker_h ucp_worker, test_req_t *request)
{
ucs_status_t status;

/* if operation was completed immediately */
if (request == NULL) {
return UCS_OK;
}

if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}

while (request->complete == 0) {
ucp_worker_progress(ucp_worker);
}
status = ucp_request_check_status(request);

/* This request may be reused so initialize it for next time */
request->complete = 0;
ucp_request_free(request);

return status;
}

/**
Expand All @@ -235,39 +253,31 @@ static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server)
test_req_t *request;
size_t length;
int ret = 0;
ucs_status_t status;

if (!is_server) {
/* Client sends a message to the server using the stream API */
request = ucp_stream_send_nb(ep, test_message, 1,
ucp_dt_make_contig(TEST_STRING_LEN),
stream_send_cb, 0);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to send UCX message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
ret = -1;
goto out;
} else if (UCS_PTR_STATUS(request) != UCS_OK) {
request_wait(ucp_worker, request);
}
} else {
/* Server receives a message from the client using the stream API */
request = ucp_stream_recv_nb(ep, &recv_message, 1,
ucp_dt_make_contig(TEST_STRING_LEN),
stream_recv_cb, &length , 0);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to receive UCX message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
ret = -1;
goto out;
} else {
request_wait(ucp_worker, request);
printf("UCX data message was received\n");
}
stream_recv_cb, &length,
UCP_STREAM_RECV_FLAG_WAITALL);
}

print_result(is_server, recv_message);
status = request_wait(ucp_worker, request);
if (status != UCS_OK){
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send",
ucs_status_string(status));
ret = -1;
} else {
print_result(is_server, recv_message);
}

out:
return ret;
}

Expand Down

0 comments on commit f3596f4

Please sign in to comment.