Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port fixes to v1.6.x: UCM events test, client/server example #3719

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
##
#

## 1.6.0-rc2 (June 13, 2019)
## 1.6.0-rc3 (June 19, 2019)
Features:
- Modular architecture for UCT transports
- ROCm transport re-design: support for managed memory, direct copy, ROCm GDR
Expand All @@ -33,6 +33,8 @@ Bugfixes:
- Fix data race in UCP proxy endpoint
- Static checker fixes
- Fallback to ibv_create_cq() if ibv_create_cq_ex() returns ENOSYS
- Fix malloc hooks test
- Fix checking return status in ucp_client_server example

Tested configurations:
- RDMA: MLNX_OFED 4.5, distribution inbox drivers, rdma-core 22.1
Expand Down
29 changes: 18 additions & 11 deletions src/ucm/event/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,22 @@ void ucm_event_handler_remove(ucm_event_handler_t *handler)
ucm_event_leave();
}

static int ucm_events_to_native_events(int events)
{
int native_events;

native_events = events & ~(UCM_EVENT_VM_MAPPED | UCM_EVENT_VM_UNMAPPED |
UCM_EVENT_MEM_TYPE_ALLOC | UCM_EVENT_MEM_TYPE_FREE);
if (events & UCM_EVENT_VM_MAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_MAPPED;
}
if (events & UCM_EVENT_VM_UNMAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_UNMAPPED;
}

return native_events;
}

static ucs_status_t ucm_event_install(int events)
{
static ucs_init_once_t init_once = UCS_INIT_ONCE_INIITIALIZER;
Expand All @@ -472,14 +488,7 @@ static ucs_status_t ucm_event_install(int events)
}

/* Replace aggregate events with the native events which make them */
native_events = events & ~(UCM_EVENT_VM_MAPPED | UCM_EVENT_VM_UNMAPPED |
UCM_EVENT_MEM_TYPE_ALLOC | UCM_EVENT_MEM_TYPE_FREE);
if (events & UCM_EVENT_VM_MAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_MAPPED;
}
if (events & UCM_EVENT_VM_UNMAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_UNMAPPED;
}
native_events = ucm_events_to_native_events(events);

/* TODO lock */
status = ucm_mmap_install(native_events);
Expand Down Expand Up @@ -590,9 +599,7 @@ void ucm_unset_event_handler(int events, ucm_event_callback_t cb, void *arg)

ucs_status_t ucm_test_events(int events)
{
int out_events;

return ucm_mmap_test_events(events, &out_events);
return ucm_mmap_test_installed_events(ucm_events_to_native_events(events));
}

UCS_STATIC_INIT {
Expand Down
78 changes: 51 additions & 27 deletions src/ucm/mmap/install.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ucm/util/reloc.h>
#include <ucm/util/sys.h>
#include <ucm/bistro/bistro.h>
#include <ucs/sys/preprocessor.h>
#include <ucs/sys/math.h>
#include <ucs/sys/checker.h>
#include <ucs/arch/bitops.h>
Expand All @@ -37,6 +38,8 @@
do { \
(_data)->fired_events = 0; \
_call; \
ucm_trace("after %s: fired events = 0x%x", UCS_PP_MAKE_STRING(_call), \
(_data)->fired_events); \
(_data)->out_events &= ~((_event) & (_mask)) | (_data)->fired_events; \
} while(0)

Expand Down Expand Up @@ -74,6 +77,9 @@ static ucm_mmap_func_t ucm_mmap_funcs[] = {
{ {NULL, NULL}, 0}
};

static pthread_mutex_t ucm_mmap_install_mutex = PTHREAD_MUTEX_INITIALIZER;
static int ucm_mmap_installed_events = 0; /* events that were reported as installed */

static void ucm_mmap_event_test_callback(ucm_event_type_t event_type,
ucm_event_t *event, void *fired_events)
{
Expand All @@ -89,6 +95,8 @@ static void ucm_mmap_event_test_callback(ucm_event_type_t event_type,
static void
ucm_fire_mmap_events_internal(int events, ucm_mmap_test_events_data_t *data)
{
size_t sbrk_size;
int sbrk_mask;
int shmid;
void *p;

Expand Down Expand Up @@ -129,10 +137,18 @@ ucm_fire_mmap_events_internal(int events, ucm_mmap_test_events_data_t *data)
}

if (events & (UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED|UCM_EVENT_VM_UNMAPPED)) {
UCM_FIRE_EVENT(events, UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED,
data, (void)sbrk(ucm_get_page_size()));
UCM_FIRE_EVENT(events, UCM_EVENT_SBRK|UCM_EVENT_VM_UNMAPPED,
data, (void)sbrk(-ucm_get_page_size()));
if (RUNNING_ON_VALGRIND) {
/* on valgrind, doing a non-trivial sbrk() causes heap corruption */
sbrk_size = 0;
sbrk_mask = UCM_EVENT_SBRK;
} else {
sbrk_size = ucm_get_page_size();
sbrk_mask = UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED|UCM_EVENT_VM_UNMAPPED;
}
UCM_FIRE_EVENT(events, (UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED) & sbrk_mask,
data, (void)sbrk(sbrk_size));
UCM_FIRE_EVENT(events, (UCM_EVENT_SBRK|UCM_EVENT_VM_UNMAPPED) & sbrk_mask,
data, (void)sbrk(-sbrk_size));
}

if (events & UCM_EVENT_MADVISE) {
Expand All @@ -157,7 +173,8 @@ void ucm_fire_mmap_events(int events)
ucm_fire_mmap_events_internal(events, &data);
}

ucs_status_t ucm_mmap_test_events(int events, int *out_events)
/* Called with lock held */
static ucs_status_t ucm_mmap_test_events(int events)
{
ucm_event_handler_t handler;
ucm_mmap_test_events_data_t data;
Expand All @@ -172,32 +189,27 @@ ucs_status_t ucm_mmap_test_events(int events, int *out_events)
ucm_fire_mmap_events_internal(events, &data);
ucm_event_handler_remove(&handler);

*out_events = data.out_events;

ucm_debug("mmap test: got 0x%x out of 0x%x", *out_events, events);
ucm_debug("mmap test: got 0x%x out of 0x%x", data.out_events, events);

/* Return success if we caught all wanted events */
if (!ucs_test_all_flags(*out_events, events)) {
if (!ucs_test_all_flags(data.out_events, events)) {
return UCS_ERR_UNSUPPORTED;
}

return UCS_OK;
}

/* Called with lock held */
static ucs_status_t ucm_mmap_test(int events)
ucs_status_t ucm_mmap_test_installed_events(int events)
{
static int installed_events = 0;
int out_events = 0; /* GCC bug: it reports compilation fail if not initialized */
ucs_status_t status;

if (ucs_test_all_flags(installed_events, events)) {
/* All requested events are already installed */
return UCS_OK;
}

status = ucm_mmap_test_events(events, &out_events);
installed_events |= out_events;
/*
* return UCS_OK iff all installed events are actually working
* we don't check the status of events which were not successfully installed
*/
pthread_mutex_lock(&ucm_mmap_install_mutex);
status = ucm_mmap_test_events(events & ucm_mmap_installed_events);
pthread_mutex_unlock(&ucm_mmap_install_mutex);

return status;
}
Expand Down Expand Up @@ -250,14 +262,18 @@ static ucs_status_t ucs_mmap_install_reloc(int events)

ucs_status_t ucm_mmap_install(int events)
{
static pthread_mutex_t install_mutex = PTHREAD_MUTEX_INITIALIZER;
ucs_status_t status;

pthread_mutex_lock(&install_mutex);
pthread_mutex_lock(&ucm_mmap_install_mutex);

status = ucm_mmap_test(events);
if (status == UCS_OK) {
goto out_unlock;
if (ucs_test_all_flags(ucm_mmap_installed_events, events)) {
/* if we already installed these events, check that they are still
* working, and if not - reinstall them.
*/
status = ucm_mmap_test_events(events);
if (status == UCS_OK) {
goto out_unlock;
}
}

status = ucs_mmap_install_reloc(events);
Expand All @@ -266,9 +282,17 @@ ucs_status_t ucm_mmap_install(int events)
goto out_unlock;
}

status = ucm_mmap_test(events);
status = ucm_mmap_test_events(events);
if (status != UCS_OK) {
ucm_debug("failed to install mmap events");
goto out_unlock;
}

/* status == UCS_OK */
ucm_mmap_installed_events |= events;
ucm_debug("mmap installed events = 0x%x", ucm_mmap_installed_events);

out_unlock:
pthread_mutex_unlock(&install_mutex);
pthread_mutex_unlock(&ucm_mmap_install_mutex);
return status;
}
2 changes: 1 addition & 1 deletion src/ucm/mmap/mmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int ucm_override_brk(void *addr);
void *ucm_brk_syscall(void *addr);
int ucm_override_madvise(void *addr, size_t length, int advice);
void ucm_fire_mmap_events(int events);
ucs_status_t ucm_mmap_test_events(int events, int *out_events);
ucs_status_t ucm_mmap_test_installed_events(int events);

static UCS_F_ALWAYS_INLINE ucm_mmap_hook_mode_t ucm_mmap_hook_mode(void)
{
Expand Down
52 changes: 31 additions & 21 deletions test/examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static int start_client(ucp_worker_h ucp_worker, const char *ip,
static void print_result(int is_server, char *recv_message)
{
if (is_server) {
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
printf("%s", recv_message);
printf("\n\n------------------------------\n\n");
Expand All @@ -213,15 +214,32 @@ static void print_result(int is_server, char *recv_message)
}
}

static void request_wait(ucp_worker_h ucp_worker, test_req_t *request)
/**
* Progress the request until it completes.
*/
static ucs_status_t request_wait(ucp_worker_h ucp_worker, test_req_t *request)
{
ucs_status_t status;

/* if operation was completed immediately */
if (request == NULL) {
return UCS_OK;
}

if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}

while (request->complete == 0) {
ucp_worker_progress(ucp_worker);
}
status = ucp_request_check_status(request);

/* This request may be reused so initialize it for next time */
request->complete = 0;
ucp_request_free(request);

return status;
}

/**
Expand All @@ -235,39 +253,31 @@ static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server)
test_req_t *request;
size_t length;
int ret = 0;
ucs_status_t status;

if (!is_server) {
/* Client sends a message to the server using the stream API */
request = ucp_stream_send_nb(ep, test_message, 1,
ucp_dt_make_contig(TEST_STRING_LEN),
stream_send_cb, 0);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to send UCX message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
ret = -1;
goto out;
} else if (UCS_PTR_STATUS(request) != UCS_OK) {
request_wait(ucp_worker, request);
}
} else {
/* Server receives a message from the client using the stream API */
request = ucp_stream_recv_nb(ep, &recv_message, 1,
ucp_dt_make_contig(TEST_STRING_LEN),
stream_recv_cb, &length , 0);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to receive UCX message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
ret = -1;
goto out;
} else {
request_wait(ucp_worker, request);
printf("UCX data message was received\n");
}
stream_recv_cb, &length,
UCP_STREAM_RECV_FLAG_WAITALL);
}

print_result(is_server, recv_message);
status = request_wait(ucp_worker, request);
if (status != UCS_OK){
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send",
ucs_status_string(status));
ret = -1;
} else {
print_result(is_server, recv_message);
}

out:
return ret;
}

Expand Down
16 changes: 11 additions & 5 deletions test/gtest/ucm/malloc_hook.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ UCS_TEST_F(malloc_hook, test_event_failed) {
UCS_TEST_SKIP_R("skipping on non-BISTRO hooks");
}

status = event.set(UCM_EVENT_MUNMAP);
status = event.set(UCM_EVENT_MUNMAP | UCM_EVENT_VM_UNMAPPED);
ASSERT_UCS_OK(status);

/* set hook to mmap call */
Expand All @@ -1021,7 +1021,7 @@ UCS_TEST_F(malloc_hook, test_event_failed) {
status = ucm_test_events(UCM_EVENT_VM_UNMAPPED);
EXPECT_TRUE(status == UCS_ERR_UNSUPPORTED);

/* restore original mmap body */
/* restore original munmap body */
status = ucm_bistro_restore(rp);
ASSERT_UCS_OK(status);
}
Expand All @@ -1040,21 +1040,27 @@ UCS_TEST_F(malloc_hook, test_event_unmap) {
UCS_TEST_SKIP_R("skipping on non-BISTRO hooks");
}

status = event.set(UCM_EVENT_MUNMAP);
status = event.set(UCM_EVENT_MMAP | UCM_EVENT_MUNMAP | UCM_EVENT_VM_UNMAPPED);
ASSERT_UCS_OK(status);

/* set hook to mmap call */
status = ucm_bistro_patch(symbol, (void*)bistro_hook<1>::munmap, &rp);
ASSERT_UCS_OK(status);
EXPECT_NE((intptr_t)rp, NULL);

/* munmap should be broken */
status = ucm_test_events(UCM_EVENT_MUNMAP);
EXPECT_TRUE(status == UCS_ERR_UNSUPPORTED);

status = ucm_test_events(UCM_EVENT_VM_UNMAPPED);
/* vm_unmap should be broken as well, because munmap is broken */
status = ucm_test_events(UCM_EVENT_MUNMAP);
EXPECT_TRUE(status == UCS_ERR_UNSUPPORTED);

/* mmap should still work */
status = ucm_test_events(UCM_EVENT_MMAP);
EXPECT_TRUE(status == UCS_OK);

/* restore original mmap body */
/* restore original munmap body */
status = ucm_bistro_restore(rp);
ASSERT_UCS_OK(status);
}
Expand Down