Skip to content

Commit

Permalink
Merge pull request #3719 from yosefe/topic/ucm-fix-event-test-when-no…
Browse files Browse the repository at this point in the history
…t-installed-v1.6.x

Port fixes to v1.6.x: UCM events test, client/server example
  • Loading branch information
yosefe authored Jun 19, 2019
2 parents 43afe2e + bd027f1 commit 67b1623
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 66 deletions.
4 changes: 3 additions & 1 deletion NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
##
#

## 1.6.0-rc2 (June 13, 2019)
## 1.6.0-rc3 (June 19, 2019)
Features:
- Modular architecture for UCT transports
- ROCm transport re-design: support for managed memory, direct copy, ROCm GDR
Expand All @@ -33,6 +33,8 @@ Bugfixes:
- Fix data race in UCP proxy endpoint
- Static checker fixes
- Fallback to ibv_create_cq() if ibv_create_cq_ex() returns ENOSYS
- Fix malloc hooks test
- Fix checking return status in ucp_client_server example

Tested configurations:
- RDMA: MLNX_OFED 4.5, distribution inbox drivers, rdma-core 22.1
Expand Down
29 changes: 18 additions & 11 deletions src/ucm/event/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,22 @@ void ucm_event_handler_remove(ucm_event_handler_t *handler)
ucm_event_leave();
}

static int ucm_events_to_native_events(int events)
{
int native_events;

native_events = events & ~(UCM_EVENT_VM_MAPPED | UCM_EVENT_VM_UNMAPPED |
UCM_EVENT_MEM_TYPE_ALLOC | UCM_EVENT_MEM_TYPE_FREE);
if (events & UCM_EVENT_VM_MAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_MAPPED;
}
if (events & UCM_EVENT_VM_UNMAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_UNMAPPED;
}

return native_events;
}

static ucs_status_t ucm_event_install(int events)
{
static ucs_init_once_t init_once = UCS_INIT_ONCE_INIITIALIZER;
Expand All @@ -472,14 +488,7 @@ static ucs_status_t ucm_event_install(int events)
}

/* Replace aggregate events with the native events which make them */
native_events = events & ~(UCM_EVENT_VM_MAPPED | UCM_EVENT_VM_UNMAPPED |
UCM_EVENT_MEM_TYPE_ALLOC | UCM_EVENT_MEM_TYPE_FREE);
if (events & UCM_EVENT_VM_MAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_MAPPED;
}
if (events & UCM_EVENT_VM_UNMAPPED) {
native_events |= UCM_NATIVE_EVENT_VM_UNMAPPED;
}
native_events = ucm_events_to_native_events(events);

/* TODO lock */
status = ucm_mmap_install(native_events);
Expand Down Expand Up @@ -590,9 +599,7 @@ void ucm_unset_event_handler(int events, ucm_event_callback_t cb, void *arg)

ucs_status_t ucm_test_events(int events)
{
int out_events;

return ucm_mmap_test_events(events, &out_events);
return ucm_mmap_test_installed_events(ucm_events_to_native_events(events));
}

UCS_STATIC_INIT {
Expand Down
78 changes: 51 additions & 27 deletions src/ucm/mmap/install.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ucm/util/reloc.h>
#include <ucm/util/sys.h>
#include <ucm/bistro/bistro.h>
#include <ucs/sys/preprocessor.h>
#include <ucs/sys/math.h>
#include <ucs/sys/checker.h>
#include <ucs/arch/bitops.h>
Expand All @@ -37,6 +38,8 @@
do { \
(_data)->fired_events = 0; \
_call; \
ucm_trace("after %s: fired events = 0x%x", UCS_PP_MAKE_STRING(_call), \
(_data)->fired_events); \
(_data)->out_events &= ~((_event) & (_mask)) | (_data)->fired_events; \
} while(0)

Expand Down Expand Up @@ -74,6 +77,9 @@ static ucm_mmap_func_t ucm_mmap_funcs[] = {
{ {NULL, NULL}, 0}
};

static pthread_mutex_t ucm_mmap_install_mutex = PTHREAD_MUTEX_INITIALIZER;
static int ucm_mmap_installed_events = 0; /* events that were reported as installed */

static void ucm_mmap_event_test_callback(ucm_event_type_t event_type,
ucm_event_t *event, void *fired_events)
{
Expand All @@ -89,6 +95,8 @@ static void ucm_mmap_event_test_callback(ucm_event_type_t event_type,
static void
ucm_fire_mmap_events_internal(int events, ucm_mmap_test_events_data_t *data)
{
size_t sbrk_size;
int sbrk_mask;
int shmid;
void *p;

Expand Down Expand Up @@ -129,10 +137,18 @@ ucm_fire_mmap_events_internal(int events, ucm_mmap_test_events_data_t *data)
}

if (events & (UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED|UCM_EVENT_VM_UNMAPPED)) {
UCM_FIRE_EVENT(events, UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED,
data, (void)sbrk(ucm_get_page_size()));
UCM_FIRE_EVENT(events, UCM_EVENT_SBRK|UCM_EVENT_VM_UNMAPPED,
data, (void)sbrk(-ucm_get_page_size()));
if (RUNNING_ON_VALGRIND) {
/* on valgrind, doing a non-trivial sbrk() causes heap corruption */
sbrk_size = 0;
sbrk_mask = UCM_EVENT_SBRK;
} else {
sbrk_size = ucm_get_page_size();
sbrk_mask = UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED|UCM_EVENT_VM_UNMAPPED;
}
UCM_FIRE_EVENT(events, (UCM_EVENT_SBRK|UCM_EVENT_VM_MAPPED) & sbrk_mask,
data, (void)sbrk(sbrk_size));
UCM_FIRE_EVENT(events, (UCM_EVENT_SBRK|UCM_EVENT_VM_UNMAPPED) & sbrk_mask,
data, (void)sbrk(-sbrk_size));
}

if (events & UCM_EVENT_MADVISE) {
Expand All @@ -157,7 +173,8 @@ void ucm_fire_mmap_events(int events)
ucm_fire_mmap_events_internal(events, &data);
}

ucs_status_t ucm_mmap_test_events(int events, int *out_events)
/* Called with lock held */
static ucs_status_t ucm_mmap_test_events(int events)
{
ucm_event_handler_t handler;
ucm_mmap_test_events_data_t data;
Expand All @@ -172,32 +189,27 @@ ucs_status_t ucm_mmap_test_events(int events, int *out_events)
ucm_fire_mmap_events_internal(events, &data);
ucm_event_handler_remove(&handler);

*out_events = data.out_events;

ucm_debug("mmap test: got 0x%x out of 0x%x", *out_events, events);
ucm_debug("mmap test: got 0x%x out of 0x%x", data.out_events, events);

/* Return success if we caught all wanted events */
if (!ucs_test_all_flags(*out_events, events)) {
if (!ucs_test_all_flags(data.out_events, events)) {
return UCS_ERR_UNSUPPORTED;
}

return UCS_OK;
}

/* Called with lock held */
static ucs_status_t ucm_mmap_test(int events)
ucs_status_t ucm_mmap_test_installed_events(int events)
{
static int installed_events = 0;
int out_events = 0; /* GCC bug: it reports compilation fail if not initialized */
ucs_status_t status;

if (ucs_test_all_flags(installed_events, events)) {
/* All requested events are already installed */
return UCS_OK;
}

status = ucm_mmap_test_events(events, &out_events);
installed_events |= out_events;
/*
* return UCS_OK iff all installed events are actually working
* we don't check the status of events which were not successfully installed
*/
pthread_mutex_lock(&ucm_mmap_install_mutex);
status = ucm_mmap_test_events(events & ucm_mmap_installed_events);
pthread_mutex_unlock(&ucm_mmap_install_mutex);

return status;
}
Expand Down Expand Up @@ -250,14 +262,18 @@ static ucs_status_t ucs_mmap_install_reloc(int events)

ucs_status_t ucm_mmap_install(int events)
{
static pthread_mutex_t install_mutex = PTHREAD_MUTEX_INITIALIZER;
ucs_status_t status;

pthread_mutex_lock(&install_mutex);
pthread_mutex_lock(&ucm_mmap_install_mutex);

status = ucm_mmap_test(events);
if (status == UCS_OK) {
goto out_unlock;
if (ucs_test_all_flags(ucm_mmap_installed_events, events)) {
/* if we already installed these events, check that they are still
* working, and if not - reinstall them.
*/
status = ucm_mmap_test_events(events);
if (status == UCS_OK) {
goto out_unlock;
}
}

status = ucs_mmap_install_reloc(events);
Expand All @@ -266,9 +282,17 @@ ucs_status_t ucm_mmap_install(int events)
goto out_unlock;
}

status = ucm_mmap_test(events);
status = ucm_mmap_test_events(events);
if (status != UCS_OK) {
ucm_debug("failed to install mmap events");
goto out_unlock;
}

/* status == UCS_OK */
ucm_mmap_installed_events |= events;
ucm_debug("mmap installed events = 0x%x", ucm_mmap_installed_events);

out_unlock:
pthread_mutex_unlock(&install_mutex);
pthread_mutex_unlock(&ucm_mmap_install_mutex);
return status;
}
2 changes: 1 addition & 1 deletion src/ucm/mmap/mmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int ucm_override_brk(void *addr);
void *ucm_brk_syscall(void *addr);
int ucm_override_madvise(void *addr, size_t length, int advice);
void ucm_fire_mmap_events(int events);
ucs_status_t ucm_mmap_test_events(int events, int *out_events);
ucs_status_t ucm_mmap_test_installed_events(int events);

static UCS_F_ALWAYS_INLINE ucm_mmap_hook_mode_t ucm_mmap_hook_mode(void)
{
Expand Down
52 changes: 31 additions & 21 deletions test/examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static int start_client(ucp_worker_h ucp_worker, const char *ip,
static void print_result(int is_server, char *recv_message)
{
if (is_server) {
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
printf("%s", recv_message);
printf("\n\n------------------------------\n\n");
Expand All @@ -213,15 +214,32 @@ static void print_result(int is_server, char *recv_message)
}
}

static void request_wait(ucp_worker_h ucp_worker, test_req_t *request)
/**
* Progress the request until it completes.
*/
static ucs_status_t request_wait(ucp_worker_h ucp_worker, test_req_t *request)
{
ucs_status_t status;

/* if operation was completed immediately */
if (request == NULL) {
return UCS_OK;
}

if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}

while (request->complete == 0) {
ucp_worker_progress(ucp_worker);
}
status = ucp_request_check_status(request);

/* This request may be reused so initialize it for next time */
request->complete = 0;
ucp_request_free(request);

return status;
}

/**
Expand All @@ -235,39 +253,31 @@ static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server)
test_req_t *request;
size_t length;
int ret = 0;
ucs_status_t status;

if (!is_server) {
/* Client sends a message to the server using the stream API */
request = ucp_stream_send_nb(ep, test_message, 1,
ucp_dt_make_contig(TEST_STRING_LEN),
stream_send_cb, 0);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to send UCX message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
ret = -1;
goto out;
} else if (UCS_PTR_STATUS(request) != UCS_OK) {
request_wait(ucp_worker, request);
}
} else {
/* Server receives a message from the client using the stream API */
request = ucp_stream_recv_nb(ep, &recv_message, 1,
ucp_dt_make_contig(TEST_STRING_LEN),
stream_recv_cb, &length , 0);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to receive UCX message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
ret = -1;
goto out;
} else {
request_wait(ucp_worker, request);
printf("UCX data message was received\n");
}
stream_recv_cb, &length,
UCP_STREAM_RECV_FLAG_WAITALL);
}

print_result(is_server, recv_message);
status = request_wait(ucp_worker, request);
if (status != UCS_OK){
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send",
ucs_status_string(status));
ret = -1;
} else {
print_result(is_server, recv_message);
}

out:
return ret;
}

Expand Down
16 changes: 11 additions & 5 deletions test/gtest/ucm/malloc_hook.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ UCS_TEST_F(malloc_hook, test_event_failed) {
UCS_TEST_SKIP_R("skipping on non-BISTRO hooks");
}

status = event.set(UCM_EVENT_MUNMAP);
status = event.set(UCM_EVENT_MUNMAP | UCM_EVENT_VM_UNMAPPED);
ASSERT_UCS_OK(status);

/* set hook to mmap call */
Expand All @@ -1021,7 +1021,7 @@ UCS_TEST_F(malloc_hook, test_event_failed) {
status = ucm_test_events(UCM_EVENT_VM_UNMAPPED);
EXPECT_TRUE(status == UCS_ERR_UNSUPPORTED);

/* restore original mmap body */
/* restore original munmap body */
status = ucm_bistro_restore(rp);
ASSERT_UCS_OK(status);
}
Expand All @@ -1040,21 +1040,27 @@ UCS_TEST_F(malloc_hook, test_event_unmap) {
UCS_TEST_SKIP_R("skipping on non-BISTRO hooks");
}

status = event.set(UCM_EVENT_MUNMAP);
status = event.set(UCM_EVENT_MMAP | UCM_EVENT_MUNMAP | UCM_EVENT_VM_UNMAPPED);
ASSERT_UCS_OK(status);

/* set hook to mmap call */
status = ucm_bistro_patch(symbol, (void*)bistro_hook<1>::munmap, &rp);
ASSERT_UCS_OK(status);
EXPECT_NE((intptr_t)rp, NULL);

/* munmap should be broken */
status = ucm_test_events(UCM_EVENT_MUNMAP);
EXPECT_TRUE(status == UCS_ERR_UNSUPPORTED);

status = ucm_test_events(UCM_EVENT_VM_UNMAPPED);
/* vm_unmap should be broken as well, because munmap is broken */
status = ucm_test_events(UCM_EVENT_MUNMAP);
EXPECT_TRUE(status == UCS_ERR_UNSUPPORTED);

/* mmap should still work */
status = ucm_test_events(UCM_EVENT_MMAP);
EXPECT_TRUE(status == UCS_OK);

/* restore original mmap body */
/* restore original munmap body */
status = ucm_bistro_restore(rp);
ASSERT_UCS_OK(status);
}
Expand Down

0 comments on commit 67b1623

Please sign in to comment.