Skip to content

Commit

Permalink
GTEST: Non-contiguous memory send/recv tests (openucx#6/7)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-ma committed Jul 6, 2017
1 parent b1c1d79 commit 14b5ace
Show file tree
Hide file tree
Showing 20 changed files with 312 additions and 39 deletions.
13 changes: 13 additions & 0 deletions src/uct/ib/base/ib_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
# define IBV_EXP_ACCESS_REMOTE_WRITE IBV_ACCESS_REMOTE_WRITE
# define IBV_EXP_ACCESS_REMOTE_READ IBV_ACCESS_REMOTE_READ
# define IBV_EXP_ACCESS_REMOTE_ATOMIC IBV_ACCESS_REMOTE_ATOMIC
# define IBV_EXP_ACCESS_MW_BIND IBV_ACCESS_MW_BIND
# define ibv_exp_reg_shared_mr ibv_reg_shared_mr_ex
# define ibv_exp_reg_shared_mr_in ibv_reg_shared_mr_in
# define ibv_exp_query_device ibv_query_device
Expand Down Expand Up @@ -205,6 +206,18 @@ static inline int ibv_exp_cq_ignore_overrun(struct ibv_cq *cq)
#endif


/*
* Fast memory registration (UMR) support
*/
#if HAVE_STRUCT_IBV_EXP_DEVICE_ATTR_UMR_CAPS
# define IBV_EXP_HAVE_UMR(_attr) ((_attr)->exp_device_cap_flags & IBV_EXP_DEVICE_UMR)
# define IBV_DEVICE_UMR_CAPS(_attr, _field) ((_attr)->umr_caps._field)
#else
# define IBV_EXP_HAVE_UMR(_attr) 0
# define IBV_DEVICE_UMR_CAPS(_attr, _field) 0
#endif


typedef uint8_t uct_ib_uint24_t[3];

static inline void uct_ib_pack_uint24(uct_ib_uint24_t buf, const uint32_t qp_num)
Expand Down
2 changes: 2 additions & 0 deletions src/uct/ib/dc/verbs/dc_verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <uct/api/uct.h>
#include <uct/ib/base/ib_device.h>
#include <uct/ib/base/ib_log.h>
#include <uct/ib/base/ib_umr.h>
#include <uct/ib/base/ib_md.h>
#include <uct/base/uct_md.h>
#include <ucs/arch/bitops.h>
#include <ucs/arch/cpu.h>
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/rc/verbs/rc_verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <ucs/arch/bitops.h>
#include <uct/ib/base/ib_log.h>
#include <uct/ib/base/ib_umr.h>

static UCS_F_ALWAYS_INLINE void
uct_rc_verbs_ep_post_send(uct_rc_verbs_iface_t* iface, uct_rc_verbs_ep_t* ep,
Expand Down
6 changes: 6 additions & 0 deletions src/uct/ib/rc/verbs/rc_verbs_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <uct/ib/rc/base/rc_iface.h>
#include <uct/ib/base/ib_device.h>
#include <uct/ib/base/ib_log.h>
#include <uct/ib/base/ib_md.h>
#include <uct/base/uct_md.h>
#include <ucs/arch/bitops.h>
#include <ucs/arch/cpu.h>
Expand Down Expand Up @@ -67,6 +68,7 @@ static void uct_rc_verbs_handle_failure(uct_ib_iface_t *ib_iface, void *arg)
ucs_log(iface->super.super.config.failure_level,
"Send completion with error: %s",
ibv_wc_status_str(wc->status));
printf("error happended on ep=%p WR_id=%lu\n", ep, wc->wr_id);

uct_rc_ep_failed_purge_outstanding(&ep->super.super.super, ib_iface,
&ep->super.txqp);
Expand Down Expand Up @@ -643,6 +645,10 @@ void uct_rc_verbs_iface_tag_query(uct_rc_verbs_iface_t *iface,
iface_attr->cap.tag.recv.min_recv = 0;
}
#endif

#if (HAVE_IBV_EXP_QP_CREATE_UMR_CAPS || HAVE_EXP_UMR_NEW_API)
iface_attr->cap.flags |= UCT_IFACE_FLAG_MEM_NC;
#endif
}

static ucs_status_t uct_rc_verbs_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *iface_attr)
Expand Down
9 changes: 6 additions & 3 deletions test/gtest/common/test_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,28 +487,31 @@ class message_stream {
/**
* Make uct_iov_t iov[iovcnt] array with pointer elements to original buffer
*/
#define UCS_TEST_GET_BUFFER_IOV(_name_iov, _name_iovcnt, _buffer_ptr, _buffer_length, _memh, _iovcnt) \
#define UCS_TEST_GET_BUFFER_IOV(_name_iov, _name_iovcnt, _buffer_ptr,\
_buffer_length, _memh, _iovcnt, _is_strided) \
uct_iov_t _name_iov[_iovcnt]; \
const size_t _name_iovcnt = _iovcnt; \
const size_t _buffer_iov_length = _buffer_length / _name_iovcnt; \
size_t _buffer_iov_length_it = 0; \
for (size_t iov_it = 0; iov_it < _name_iovcnt; ++iov_it) { \
_name_iov[iov_it].buffer = (char *)(_buffer_ptr) + _buffer_iov_length_it; \
_name_iov[iov_it].count = 1; \
_name_iov[iov_it].stride = 0; \
_name_iov[iov_it].memh = _memh; \
if (iov_it == (_name_iovcnt - 1)) { /* Last iteration */ \
_name_iov[iov_it].length = _buffer_length - _buffer_iov_length_it; \
} else { \
_name_iov[iov_it].length = _buffer_iov_length; \
_buffer_iov_length_it += _buffer_iov_length; \
} \
_name_iov[iov_it].stride = _is_strided ? _name_iov[iov_it].length : 0; \
_name_iov[iov_it].ilv_ratio = _is_strided; \
}

/**
* Make ucp_dt_iov_t iov[iovcnt] array with pointer elements to original buffer
*/
#define UCS_TEST_GET_BUFFER_DT_IOV(_name_iov, _name_iovcnt, _buffer_ptr, _buffer_length, _iovcnt) \
#define UCS_TEST_GET_BUFFER_DT_IOV(_name_iov, _name_iovcnt, _buffer_ptr,\
_buffer_length, _iovcnt, _is_strided) \
ucp_dt_iov_t _name_iov[_iovcnt]; \
const size_t _name_iovcnt = _iovcnt; \
const size_t _name_iov##_length = (_buffer_length > _name_iovcnt) ? \
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucp/test_ucp_peer_failure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ UCS_TEST_P(test_ucp_peer_failure, disable_sync_send) {
EXPECT_FALSE(UCS_PTR_IS_PTR(req));
EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));

UCS_TEST_GET_BUFFER_DT_IOV(iov_, iov_cnt_, buf.data(), size, 40ul);
UCS_TEST_GET_BUFFER_DT_IOV(iov_, iov_cnt_, buf.data(), size, 40ul, 0);
req = send_sync_nb(iov_, iov_cnt_, DATATYPE_IOV, 0x111337);
EXPECT_FALSE(UCS_PTR_IS_PTR(req));
EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
Expand Down
1 change: 1 addition & 0 deletions test/gtest/ucp/test_ucp_tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class test_ucp_tag : public ucp_test {
static const uint32_t MAGIC = 0xd7d7d7d7U;
static const ucp_datatype_t DATATYPE;
static const ucp_datatype_t DATATYPE_IOV;

static ucp_generic_dt_ops test_dt_uint32_ops;
static ucp_generic_dt_ops test_dt_uint32_err_ops;
static ucp_generic_dt_ops test_dt_uint8_ops;
Expand Down
104 changes: 97 additions & 7 deletions test/gtest/ucp/test_ucp_tag_xfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class test_ucp_tag_xfer : public test_ucp_tag {

void test_xfer_contig(size_t size, bool expected, bool sync, bool truncated);
void test_xfer_generic(size_t size, bool expected, bool sync, bool truncated);
void test_xfer_stride(size_t size, bool expected, bool sync, bool truncated);
void test_xfer_iov(size_t size, bool expected, bool sync, bool truncated);
void test_xfer_generic_err(size_t size, bool expected, bool sync, bool truncated);

Expand All @@ -72,6 +73,8 @@ class test_ucp_tag_xfer : public test_ucp_tag {
ucp_datatype_t *recv_dt);
void test_xfer_probe(bool send_contig, bool recv_contig,
bool expected, bool sync);
void test_xfer_stride_internal(size_t size, bool expected, bool sync, bool truncated, bool reusable);
void test_xfer_iov_internal(size_t size, bool expected, bool sync, bool truncated, bool stride, bool reusable);

private:
size_t do_xfer(const void *sendbuf, void *recvbuf, size_t count,
Expand All @@ -97,6 +100,8 @@ int check_buffers(const std::vector<char> &sendbuf, const std::vector<char> &rec
for (size_t it = 0; it < recvd; ++it) {
if (sendbuf[it] != recvbuf[it]) {
ms << datatype << ':'
<< " send_buff=" << (void*)sendbuf.data()
<< " recv_buff=" << (void*)recvbuf.data()
<< " send_iovcnt=" << std::dec << send_iovcnt
<< " recv_iovcnt=" << recv_iovcnt << " size=" << size
<< " expected=" << expected << " sync=" << sync
Expand All @@ -111,8 +116,7 @@ int check_buffers(const std::vector<char> &sendbuf, const std::vector<char> &rec
return buffers_equal;
}

void test_ucp_tag_xfer::test_xfer(xfer_func_t func, bool expected, bool sync,
bool truncated)
void test_ucp_tag_xfer::test_xfer(xfer_func_t func, bool expected, bool sync, bool truncated)
{
if (sync) {
skip_err_handling();
Expand Down Expand Up @@ -152,7 +156,7 @@ void test_ucp_tag_xfer::test_xfer_prepare_bufs(uint8_t *sendbuf, uint8_t *recvbu
for (unsigned i = 0; i < count; ++i) {
sendbuf[i] = i % 256;
}
*send_dt = DATATYPE;
*send_dt = ucp_dt_make_contig(1);
} else {
/* the sender has a generic datatype */
status = ucp_dt_create_generic(&test_dt_uint8_ops, NULL, send_dt);
Expand All @@ -161,7 +165,7 @@ void test_ucp_tag_xfer::test_xfer_prepare_bufs(uint8_t *sendbuf, uint8_t *recvbu

if (recv_contig) {
/* the recv has a contig datatype for the data buffer */
*recv_dt = DATATYPE;
*recv_dt = ucp_dt_make_contig(1);
} else {
/* the receiver has a generic datatype */
status = ucp_dt_create_generic(&test_dt_uint8_ops, NULL, recv_dt);
Expand Down Expand Up @@ -328,25 +332,88 @@ void test_ucp_tag_xfer::test_xfer_generic(size_t size, bool expected, bool sync,
ucp_dt_destroy(dt);
}

void test_ucp_tag_xfer::test_xfer_stride(size_t size, bool expected, bool sync,
bool truncated)
{
test_xfer_stride_internal(size, expected, sync, truncated, false);
test_xfer_stride_internal(size, expected, sync, truncated, true);
}

void test_ucp_tag_xfer::test_xfer_stride_internal(size_t size, bool expected,
bool sync, bool truncated,
bool reusable)
{
std::vector<char> sendbuf(size, 0);
std::vector<char> recvbuf(size, 0);
ucp_datatype_t send_dt;
ucp_datatype_t recv_dt;
if (reusable) {
send_dt = ucp_dt_make_stride_reusable(ucp_dt_make_contig(1), 1, size);
recv_dt = ucp_dt_make_stride_reusable(ucp_dt_make_contig(size), size, 1);
} else {
send_dt = ucp_dt_make_stride(ucp_dt_make_contig(size), size, 1);
recv_dt = ucp_dt_make_stride(ucp_dt_make_contig(1), 1, size);
}

ucs::fill_random(sendbuf.begin(), sendbuf.end());

size_t recvd = do_xfer(sendbuf.data(), recvbuf.data(), 1, send_dt, recv_dt,
expected, sync, truncated);
if (!truncated) {
ASSERT_EQ(sendbuf.size(), recvd);
}
EXPECT_TRUE(!check_buffers(sendbuf, recvbuf, recvd, 1, 1,
size, expected, sync, "STRIDE"));

if (reusable) {
ucp_dt_destroy(send_dt);
ucp_dt_destroy(recv_dt);
}
}

void test_ucp_tag_xfer::test_xfer_iov(size_t size, bool expected, bool sync,
bool truncated)
{
test_xfer_iov_internal(size, expected, sync, truncated, false, false);
test_xfer_iov_internal(size, expected, sync, truncated, true, false);
test_xfer_iov_internal(size, expected, sync, truncated, false, true);
test_xfer_iov_internal(size, expected, sync, truncated, true, true);
}

void test_ucp_tag_xfer::test_xfer_iov_internal(size_t size, bool expected,
bool sync, bool truncated,
bool stride, bool reusable)
{
const size_t iovcnt = 20;
std::vector<char> sendbuf(size, 0);
std::vector<char> recvbuf(size, 0);
ucp_datatype_t send_dt;
ucp_datatype_t recv_dt;
if (reusable) {
send_dt = ucp_dt_make_iov_reusable();
recv_dt = ucp_dt_make_iov_reusable();
} else {
send_dt = ucp_dt_make_iov();
recv_dt = ucp_dt_make_iov();
}

ucs::fill_random(sendbuf.begin(), sendbuf.end());

UCS_TEST_GET_BUFFER_DT_IOV(send_iov, send_iovcnt, sendbuf.data(), sendbuf.size(), iovcnt);
UCS_TEST_GET_BUFFER_DT_IOV(recv_iov, recv_iovcnt, recvbuf.data(), recvbuf.size(), iovcnt);
UCS_TEST_GET_BUFFER_DT_IOV(send_iov, send_iovcnt, sendbuf.data(), sendbuf.size(), iovcnt, stride);
UCS_TEST_GET_BUFFER_DT_IOV(recv_iov, recv_iovcnt, recvbuf.data(), recvbuf.size(), iovcnt, stride);

size_t recvd = do_xfer(&send_iov, &recv_iov, iovcnt, DATATYPE_IOV, DATATYPE_IOV,
size_t recvd = do_xfer(&send_iov, &recv_iov, iovcnt, send_dt, recv_dt,
expected, sync, truncated);
if (!truncated) {
ASSERT_EQ(sendbuf.size(), recvd);
}
EXPECT_TRUE(!check_buffers(sendbuf, recvbuf, recvd, send_iovcnt, recv_iovcnt,
size, expected, sync, "IOV"));

if (reusable) {
ucp_dt_destroy(send_dt);
ucp_dt_destroy(recv_dt);
}
}

void test_ucp_tag_xfer::test_xfer_generic_err(size_t size, bool expected,
Expand Down Expand Up @@ -483,6 +550,18 @@ UCS_TEST_P(test_ucp_tag_xfer, iov_unexp) {
test_xfer(&test_ucp_tag_xfer::test_xfer_iov, false, false, false);
}

UCS_TEST_P(test_ucp_tag_xfer, stride_exp) {
test_xfer(&test_ucp_tag_xfer::test_xfer_stride, true, false, false);
}

UCS_TEST_P(test_ucp_tag_xfer, stride_exp_truncated) {
test_xfer(&test_ucp_tag_xfer::test_xfer_stride, true, false, true);
}

UCS_TEST_P(test_ucp_tag_xfer, stride_unexp) {
test_xfer(&test_ucp_tag_xfer::test_xfer_stride, false, false, false);
}

UCS_TEST_P(test_ucp_tag_xfer, generic_err_exp) {
test_xfer(&test_ucp_tag_xfer::test_xfer_generic_err, true, false, false);
}
Expand Down Expand Up @@ -535,6 +614,17 @@ UCS_TEST_P(test_ucp_tag_xfer, iov_unexp_sync) {
test_xfer(&test_ucp_tag_xfer::test_xfer_iov, false, true, false);
}

UCS_TEST_P(test_ucp_tag_xfer, stride_exp_sync) {
/* because ucp_tag_send_req return status (instead request) if send operation
* completed immediately */
skip_loopback();
test_xfer(&test_ucp_tag_xfer::test_xfer_stride, true, true, false);
}

UCS_TEST_P(test_ucp_tag_xfer, stride_unexp_sync) {
test_xfer(&test_ucp_tag_xfer::test_xfer_stride, false, true, false);
}

/* send_contig_recv_contig */

UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp, "RNDV_THRESH=1248576") {
Expand Down
6 changes: 3 additions & 3 deletions test/gtest/ucs/test_rcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class test_rcache : public ucs::test {
region *get(void *address, size_t length, int prot = PROT_READ|PROT_WRITE) {
ucs_status_t status;
ucs_rcache_region_t *r;
status = ucs_rcache_get(m_rcache, address, length, prot, NULL, &r);
status = ucs_rcache_get(m_rcache, address, length, prot, 0, &r);
ASSERT_UCS_OK(status);
EXPECT_TRUE(r != NULL);
struct region *region = ucs_derived_of(r, struct region);
Expand Down Expand Up @@ -123,7 +123,7 @@ class test_rcache : public ucs::test {
private:

static ucs_status_t mem_reg_cb(void *context, ucs_rcache_t *rcache,
void *arg, ucs_rcache_region_t *r)
unsigned flags, ucs_rcache_region_t *r)
{
return reinterpret_cast<test_rcache*>(context)->mem_reg(
ucs_derived_of(r, struct region));
Expand Down Expand Up @@ -494,7 +494,7 @@ UCS_MT_TEST_F(test_rcache_no_register, register_failure, 10) {

ucs_status_t status;
ucs_rcache_region_t *r;
status = ucs_rcache_get(m_rcache, ptr, size, PROT_READ|PROT_WRITE, NULL, &r);
status = ucs_rcache_get(m_rcache, ptr, size, PROT_READ|PROT_WRITE, 0, &r);
EXPECT_EQ(UCS_ERR_IO_ERROR, status);
EXPECT_EQ(0u, m_reg_count);

Expand Down
2 changes: 1 addition & 1 deletion test/gtest/uct/test_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class uct_flush_test : public uct_test {
ucs_status_t status;
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(),
sendbuf.memh(),
sender().iface_attr().cap.am.max_iov);
sender().iface_attr().cap.am.max_iov, 0);
do {
status = uct_ep_am_zcopy(sender().ep(0), AM_ID, NULL, 0, iov, iovcnt, &zcomp);
} while (status == UCS_ERR_NO_RESOURCE);
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/uct/test_p2p_am.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class uct_p2p_am_test : public uct_p2p_test

UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ((char*)sendbuf.ptr() + hdr_size),
(sendbuf.length() - hdr_size), sendbuf.memh(),
sender().iface_attr().cap.am.max_iov);
sender().iface_attr().cap.am.max_iov, 0);

return uct_ep_am_zcopy(ep,
AM_ID,
Expand Down
4 changes: 2 additions & 2 deletions test/gtest/uct/test_p2p_err.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class uct_p2p_err_test : public uct_p2p_test {
case OP_PUT_ZCOPY:
{
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buffer, length, memh,
sender().iface_attr().cap.put.max_iov);
sender().iface_attr().cap.put.max_iov, 0);
status = uct_ep_put_zcopy(sender_ep(), iov, iovcnt,
remote_addr, rkey, NULL);
}
Expand All @@ -80,7 +80,7 @@ class uct_p2p_err_test : public uct_p2p_test {
break;
case OP_AM_ZCOPY:
{
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buffer, 1, memh, 1);
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buffer, 1, memh, 1, 0);
status = uct_ep_am_zcopy(sender_ep(), am_id, buffer, length,
iov, iovcnt, NULL);
}
Expand Down
Loading

0 comments on commit 14b5ace

Please sign in to comment.