Skip to content

Commit

Permalink
EXAMPLES/HELLO: Add progresss during OOB barrier to solve hang
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe committed Aug 16, 2021
1 parent 4e27896 commit 58407ae
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
10 changes: 9 additions & 1 deletion examples/hello_world_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <ucs/memory/memory_type.h>

#include <sys/poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -265,8 +266,10 @@ int client_connect(const char *server, uint16_t server_port)
return -1;
}

static inline int barrier(int oob_sock)
static inline int
barrier(int oob_sock, void (*progress_cb)(void *arg), void *arg)
{
struct pollfd pfd = { .fd = oob_sock, .events = POLLIN };
int dummy = 0;
ssize_t res;

Expand All @@ -275,6 +278,11 @@ static inline int barrier(int oob_sock)
return res;
}

do {
res = poll(&pfd, 1, 1);
progress_cb(arg);
} while (res != 1);

res = recv(oob_sock, &dummy, sizeof(dummy), MSG_WAITALL);

/* number of received bytes should be the same as sent */
Expand Down
7 changes: 6 additions & 1 deletion examples/ucp_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ static int run_test(const char *client_target_name, ucp_worker_h ucp_worker)
}
}

static void progress_worker(void *arg)
{
ucp_worker_progress((ucp_worker_h)arg);
}

int main(int argc, char **argv)
{
/* UCP temporary vars */
Expand Down Expand Up @@ -602,7 +607,7 @@ int main(int argc, char **argv)

if (!ret && (err_handling_opt.failure_mode == FAILURE_MODE_NONE)) {
/* Make sure remote is disconnected before destroying local worker */
ret = barrier(oob_sock);
ret = barrier(oob_sock, progress_worker, ucp_worker);
}
close(oob_sock);

Expand Down
9 changes: 7 additions & 2 deletions examples/uct_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,11 @@ int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf)
return 0;
}

static void progress_worker(void *arg)
{
uct_worker_progress((uct_worker_h)arg);
}

int main(int argc, char **argv)
{
uct_device_addr_t *peer_dev = NULL;
Expand Down Expand Up @@ -660,7 +665,7 @@ int main(int argc, char **argv)

/* Connect endpoint to a remote endpoint */
status = uct_ep_connect_to_ep(ep, peer_dev, peer_ep);
if (barrier(oob_sock)) {
if (barrier(oob_sock, progress_worker, if_info.worker)) {
status = UCS_ERR_IO_ERROR;
goto out_free_ep;
}
Expand Down Expand Up @@ -729,7 +734,7 @@ int main(int argc, char **argv)
}
}

if (barrier(oob_sock)) {
if (barrier(oob_sock, progress_worker, if_info.worker)) {
status = UCS_ERR_IO_ERROR;
}

Expand Down

0 comments on commit 58407ae

Please sign in to comment.