From 814c4c689f4ea537e74387356d5186c339dd6561 Mon Sep 17 00:00:00 2001 From: binl Date: Fri, 14 Aug 2020 10:53:23 +0300 Subject: [PATCH] TEST/APPS/IO_DEMO: Merge iodemo changes from openucx:master branch. --- test/apps/iodemo/io_demo.cc | 264 ++++++++++++++++++++++++++------ test/apps/iodemo/ucx_wrapper.cc | 39 ++--- test/apps/iodemo/ucx_wrapper.h | 10 +- 3 files changed, 243 insertions(+), 70 deletions(-) diff --git a/test/apps/iodemo/io_demo.cc b/test/apps/iodemo/io_demo.cc index 6528df38e14..3e95785e49c 100644 --- a/test/apps/iodemo/io_demo.cc +++ b/test/apps/iodemo/io_demo.cc @@ -17,7 +17,9 @@ #include #include #include +#include #include +#include #define ALIGNMENT 4096 @@ -43,6 +45,7 @@ typedef struct { size_t iomsg_size; size_t min_data_size; size_t max_data_size; + size_t chunk_size; long iter_count; long window_size; std::vector operations; @@ -54,6 +57,80 @@ typedef struct { #define LOG UcxLog("[DEMO]", true) #define VERBOSE_LOG UcxLog("[DEMO]", _test_opts.verbose) +template +class MemoryPool { +public: + MemoryPool(size_t buffer_size = 0) : + _num_allocated(0), _buffer_size(buffer_size) { + } + + ~MemoryPool() { + if (_num_allocated != _freelist.size()) { + LOG << "Some items were not freed. Total:" << _num_allocated + << ", current:" << _freelist.size() << "."; + } + + for (size_t i = 0; i < _freelist.size(); i++) { + delete _freelist[i]; + } + } + + T * get() { + T * item; + + if (_freelist.empty()) { + item = new T(_buffer_size, this); + _num_allocated++; + } else { + item = _freelist.back(); + _freelist.pop_back(); + } + return item; + } + + void put(T * item) { + _freelist.push_back(item); + } + +private: + std::vector _freelist; + uint32_t _num_allocated; + size_t _buffer_size; +}; + +/** + * Linear congruential generator (LCG): + * n[i + 1] = (n[i] * A + C) % M + * where A, C, M used as in glibc + */ +class IoDemoRandom { +public: + static void srand(unsigned seed) { + _seed = seed & _M; + } + + static inline int rand(int min = std::numeric_limits::min(), + int max = std::numeric_limits::max()) { + _seed = (_seed * _A + _C) & _M; + /* To resolve that LCG returns alternating even/odd values */ + if (max - min == 1) { + return (_seed & 0x100) ? max : min; + } else { + return (int)_seed % (max - min + 1) + min; + } + } + +private: + static unsigned _seed; + static const unsigned _A; + static const unsigned _C; + static const unsigned _M; +}; +unsigned IoDemoRandom::_seed = 0; +const unsigned IoDemoRandom::_A = 1103515245U; +const unsigned IoDemoRandom::_C = 12345U; +const unsigned IoDemoRandom::_M = 0x7fffffffU; + class P2pDemoCommon : public UcxContext { protected: @@ -64,13 +141,23 @@ class P2pDemoCommon : public UcxContext { size_t data_size; } iomsg_hdr_t; + typedef enum { + XFER_TYPE_SEND, + XFER_TYPE_RECV + } xfer_type_t; + /* Asynchronous IO message */ class IoMessage : public UcxCallback { public: - IoMessage(size_t buffer_size, io_op_t op, uint32_t sn, size_t data_size) : - _buffer(malloc(buffer_size)) { + IoMessage(size_t buffer_size, MemoryPool* pool) { + _buffer = malloc(buffer_size); + _pool = pool; + _buffer_size = buffer_size; + } + + void init(io_op_t op, uint32_t sn, size_t data_size) { iomsg_hdr_t *hdr = reinterpret_cast(_buffer); - assert(sizeof(*hdr) <= buffer_size); + assert(sizeof(*hdr) <= _buffer_size); hdr->op = op; hdr->sn = sn; hdr->data_size = data_size; @@ -81,7 +168,7 @@ class P2pDemoCommon : public UcxContext { } virtual void operator()(ucs_status_t status) { - delete this; + _pool->put(this); } void *buffer() { @@ -89,12 +176,14 @@ class P2pDemoCommon : public UcxContext { } private: - void *_buffer; + void* _buffer; + MemoryPool* _pool; + size_t _buffer_size; }; P2pDemoCommon(const options_t& test_opts) : UcxContext(test_opts.iomsg_size), _test_opts(test_opts), - _cur_buffer_idx(0) { + _io_msg_pool(opts().iomsg_size), _cur_buffer_idx(0), _padding(0) { _data_buffers.resize(opts().num_buffers); for (size_t i = 0; i < _data_buffers.size(); ++i) { @@ -113,28 +202,71 @@ class P2pDemoCommon : public UcxContext { return &_data_buffers[_cur_buffer_idx][_padding]; } + inline void *buffer(size_t offset) { + return &_data_buffers[_cur_buffer_idx][_padding + offset]; + } + inline void next_buffer() { _cur_buffer_idx = (_cur_buffer_idx + 1) % _data_buffers.size(); assert(_cur_buffer_idx < opts().num_buffers); } inline size_t get_data_size() { - return opts().min_data_size + - (std::rand() % static_cast(opts().max_data_size - - opts().min_data_size + 1)); + return IoDemoRandom::rand(opts().min_data_size, + opts().max_data_size); } bool send_io_message(UcxConnection *conn, io_op_t op, uint32_t sn, size_t data_size) { - IoMessage *m = new IoMessage(opts().iomsg_size, op, sn, - data_size); + IoMessage *m = _io_msg_pool.get(); + m->init(op, sn, data_size); VERBOSE_LOG << "sending IO " << io_op_names[op] << ", sn " << sn << " data size " << data_size; return conn->send_io_message(m->buffer(), opts().iomsg_size, m); } + void send_recv_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn, + xfer_type_t send_recv_data, + UcxCallback* callback = EmptyCallback::get()) { + size_t remaining = data_size; + while (remaining > 0) { + size_t xfer_size = std::min(opts().chunk_size, remaining); + if (send_recv_data == XFER_TYPE_SEND) { + conn->send_data(buffer(data_size - remaining), xfer_size, sn, callback); + } else { + conn->recv_data(buffer(data_size - remaining), xfer_size, sn, callback); + } + remaining -= xfer_size; + } + } + + void send_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn, + UcxCallback* callback = EmptyCallback::get()) { + send_recv_data_as_chunks(conn, data_size, sn, XFER_TYPE_SEND, callback); + } + + void recv_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn, + UcxCallback* callback = EmptyCallback::get()) { + send_recv_data_as_chunks(conn, data_size, sn, XFER_TYPE_RECV, callback); + } + + uint32_t get_chunk_cnt(size_t data_size) { + return (data_size + opts().chunk_size - 1) / opts().chunk_size; + } + + void send_data(UcxConnection* conn, size_t data_size, uint32_t sn, + UcxCallback* callback = EmptyCallback::get()) { + send_data_as_chunks(conn, data_size, sn, callback); + } + + void recv_data(UcxConnection* conn, size_t data_size, uint32_t sn, + UcxCallback* callback = EmptyCallback::get()) { + recv_data_as_chunks(conn, data_size, sn, callback); + } + protected: const options_t _test_opts; + MemoryPool _io_msg_pool; private: std::vector _data_buffers; @@ -148,26 +280,42 @@ class DemoServer : public P2pDemoCommon { // sends an IO response when done class IoWriteResponseCallback : public UcxCallback { public: - IoWriteResponseCallback(DemoServer *server, UcxConnection* conn, - uint32_t sn, size_t data_size) : - _server(server), _conn(conn), _sn(sn), _data_size(data_size) { + IoWriteResponseCallback(size_t buffer_size, + MemoryPool* pool) : + _server(NULL), _conn(NULL), _sn(0), _data_size(0), _chunk_cnt(0) { + _pool = pool; + } + + void init(DemoServer *server, UcxConnection* conn, uint32_t sn, + size_t data_size, uint32_t chunk_cnt = 1) { + _server = server; + _conn = conn; + _sn = sn; + _data_size = data_size; + _chunk_cnt = chunk_cnt; } virtual void operator()(ucs_status_t status) { + if (--_chunk_cnt > 0) { + return; + } if (status == UCS_OK) { _server->send_io_message(_conn, IO_COMP, _sn, _data_size); } - delete this; + _pool->put(this); } private: - DemoServer* _server; - UcxConnection* _conn; - uint32_t _sn; - size_t _data_size; + DemoServer* _server; + UcxConnection* _conn; + uint32_t _sn; + size_t _data_size; + uint32_t _chunk_cnt; + MemoryPool* _pool; }; - DemoServer(const options_t& test_opts) : P2pDemoCommon(test_opts) { + DemoServer(const options_t& test_opts) : + P2pDemoCommon(test_opts), _callback_pool(0) { } void run() { @@ -191,12 +339,13 @@ class DemoServer : public P2pDemoCommon { // send data VERBOSE_LOG << "sending IO read data"; assert(opts().max_data_size >= hdr->data_size); - conn->send_data(buffer(), hdr->data_size, hdr->sn); + send_data(conn, hdr->data_size, hdr->sn); + // send response as data VERBOSE_LOG << "sending IO read response"; - IoMessage *response = new IoMessage(opts().iomsg_size, IO_COMP, hdr->sn, - 0); + IoMessage *response = _io_msg_pool.get(); + response->init(IO_COMP, hdr->sn, 0); conn->send_data(response->buffer(), opts().iomsg_size, hdr->sn, response); @@ -206,9 +355,11 @@ class DemoServer : public P2pDemoCommon { void handle_io_write_request(UcxConnection* conn, const iomsg_hdr_t *hdr) { VERBOSE_LOG << "receiving IO write data"; assert(opts().max_data_size >= hdr->data_size); - conn->recv_data(buffer(), hdr->data_size, hdr->sn, - new IoWriteResponseCallback(this, conn, hdr->sn, - hdr->data_size)); + assert(hdr->data_size != 0); + + IoWriteResponseCallback *w = _callback_pool.get(); + w->init(this, conn, hdr->sn, hdr->data_size, get_chunk_cnt(hdr->data_size)); + recv_data(conn, hdr->data_size, hdr->sn, w); next_buffer(); } @@ -234,6 +385,8 @@ class DemoServer : public P2pDemoCommon { LOG << "Invalid opcode: " << hdr->op; } } +protected: + MemoryPool _callback_pool; }; @@ -241,8 +394,17 @@ class DemoClient : public P2pDemoCommon { public: class IoReadResponseCallback : public UcxCallback { public: - IoReadResponseCallback(long *counter, size_t iomsg_size) : - _counter(0), _io_counter(counter), _buffer(malloc(iomsg_size)) { + IoReadResponseCallback(size_t buffer_size, + MemoryPool* pool) : + _counter(0), _io_counter(0), _chunk_cnt(0) { + _buffer = malloc(buffer_size); + _pool = pool; + } + + void init(long *counter, uint32_t chunk_cnt = 1) { + _counter = 0; + _io_counter = counter; + _chunk_cnt = chunk_cnt; } ~IoReadResponseCallback() { @@ -251,12 +413,12 @@ class DemoClient : public P2pDemoCommon { virtual void operator()(ucs_status_t status) { /* wait data and response completion */ - if (++_counter < 2) { + if (++_counter < (1 + _chunk_cnt)) { return; } ++(*_io_counter); - delete this; + _pool->put(this); } void* buffer() { @@ -264,14 +426,17 @@ class DemoClient : public P2pDemoCommon { } private: - long _counter; - long* _io_counter; - void* _buffer; + long _counter; + long* _io_counter; + uint32_t _chunk_cnt; + void* _buffer; + MemoryPool* _pool; }; DemoClient(const options_t& test_opts) : P2pDemoCommon(test_opts), - _num_sent(0), _num_completed(0), _error_flag(true), _retry(0) + _num_sent(0), _num_completed(0), _error_flag(true), + _retry(0), _callback_pool(opts().iomsg_size) { } @@ -283,11 +448,10 @@ class DemoClient : public P2pDemoCommon { } ++_num_sent; - IoReadResponseCallback *response = - new IoReadResponseCallback(&_num_completed, opts().iomsg_size); - conn->recv_data(buffer(), data_size, sn, response); - conn->recv_data(response->buffer(), opts().iomsg_size, sn, response); - + IoReadResponseCallback *r = _callback_pool.get(); + r->init(&_num_completed, get_chunk_cnt(data_size)); + recv_data(conn, data_size, sn, r); + conn->recv_data(r->buffer(), opts().iomsg_size, sn, r); next_buffer(); return data_size; @@ -303,8 +467,7 @@ class DemoClient : public P2pDemoCommon { ++_num_sent; VERBOSE_LOG << "sending data " << buffer() << " size " << data_size << " sn " << sn; - conn->send_data(buffer(), data_size, sn); - + send_data(conn, data_size, sn); next_buffer(); return data_size; @@ -430,7 +593,7 @@ class DemoClient : public P2pDemoCommon { break; } - size_t conn_num = std::rand() % conn.size(); + size_t conn_num = IoDemoRandom::rand(0, conn.size() - 1); io_op_t op = get_op(); size_t size; switch (op) { @@ -499,8 +662,8 @@ class DemoClient : public P2pDemoCommon { return opts().operations[0]; } - return opts().operations[std::rand() % - opts().operations.size()]; + return opts().operations[IoDemoRandom::rand( + 0, opts().operations.size() - 1)]; } void report_performance(long num_iters, double elapsed, @@ -546,6 +709,8 @@ class DemoClient : public P2pDemoCommon { long _num_completed; bool _error_flag; unsigned _retry; +protected: + MemoryPool _callback_pool; }; static int set_data_size(char *str, options_t* test_opts) @@ -590,6 +755,7 @@ static int parse_args(int argc, char **argv, options_t* test_opts) test_opts->client_timeout = 1.0; test_opts->min_data_size = 4096; test_opts->max_data_size = 4096; + test_opts->chunk_size = std::numeric_limits::max(); test_opts->num_buffers = 1; test_opts->iomsg_size = 256; test_opts->iter_count = 1000; @@ -597,7 +763,7 @@ static int parse_args(int argc, char **argv, options_t* test_opts) test_opts->random_seed = std::time(NULL); test_opts->verbose = false; - while ((c = getopt(argc, argv, "p:c:r:d:b:i:w:o:t:s:v")) != -1) { + while ((c = getopt(argc, argv, "p:c:r:d:b:i:w:k:o:t:s:v")) != -1) { switch (c) { case 'p': test_opts->port_num = atoi(optarg); @@ -628,6 +794,9 @@ static int parse_args(int argc, char **argv, options_t* test_opts) case 'w': test_opts->window_size = atoi(optarg); break; + case 'k': + test_opts->chunk_size = strtol(optarg, NULL, 0); + break; case 'o': str = strtok(optarg, ","); while (str != NULL) { @@ -674,13 +843,14 @@ static int parse_args(int argc, char **argv, options_t* test_opts) std::cout << "Supported options are:" << std::endl; std::cout << " -p TCP port number to use" << std::endl; std::cout << " -o Comma-separated string of IO operations [read|write]" << std::endl; - std::cout << " NOTE: if using \"random\", performance" << std::endl; + std::cout << " NOTE: if using several IO operations, performance" << std::endl; std::cout << " measurments may be inaccurate" << std::endl; std::cout << " -d : Range that should be used to get data" << std::endl; std::cout << " size of IO payload" << std::endl; std::cout << " -b Number of IO buffers to use for communications" << std::endl; std::cout << " -i Number of iterations to run communication" << std::endl; std::cout << " -w Number of outstanding requests" << std::endl; + std::cout << " -k Split the data transfer to chunks of this size" << std::endl; std::cout << " -r Size of IO request packet" << std::endl; std::cout << " -c Number of retries on client for failure" << std::endl; std::cout << " -t Client timeout, in seconds" << std::endl; @@ -715,7 +885,7 @@ static int do_server(const options_t& test_opts) static int do_client(const options_t& test_opts) { - std::srand(test_opts.random_seed); + IoDemoRandom::srand(test_opts.random_seed); LOG << "random seed: " << test_opts.random_seed; DemoClient client(test_opts); diff --git a/test/apps/iodemo/ucx_wrapper.cc b/test/apps/iodemo/ucx_wrapper.cc index d804e6fd7fd..495142f6ba6 100644 --- a/test/apps/iodemo/ucx_wrapper.cc +++ b/test/apps/iodemo/ucx_wrapper.cc @@ -22,7 +22,7 @@ struct ucx_request { bool completed; uint32_t conn_id; size_t recv_length; - ucx_request_list_t::iterator pos; + ucs_list_link_t pos; }; UcxCallback::~UcxCallback() @@ -39,7 +39,7 @@ EmptyCallback* EmptyCallback::get() { return &instance; } -UcxLog::UcxLog(const std::string& prefix, bool enable) : _enable(enable) +UcxLog::UcxLog(const char* prefix, bool enable) : _enable(enable) { if (enable) { std::cout << prefix << " "; @@ -189,7 +189,8 @@ void UcxContext::request_reset(ucx_request *r) r->callback = NULL; r->conn = NULL; r->recv_length = 0; - r->pos = ucx_request_list_t::iterator(); + r->pos.next = NULL; + r->pos.prev = NULL; } void UcxContext::request_release(void *request) @@ -397,6 +398,7 @@ UcxConnection::UcxConnection(UcxContext &context, uint32_t conn_id) : struct sockaddr_in in_addr = {0}; in_addr.sin_family = AF_INET; set_log_prefix((const struct sockaddr*)&in_addr, sizeof(in_addr)); + ucs_list_head_init(&_all_requests); UCX_CONN_LOG << "created new connection, total: " << _num_instances; } @@ -412,11 +414,11 @@ UcxConnection::~UcxConnection() } // wait until all requests are completed - if (!_all_requests.empty()) { - UCX_CONN_LOG << "waiting for " << _all_requests.size() << + if (!ucs_list_is_empty(&_all_requests)) { + UCX_CONN_LOG << "waiting for " << ucs_list_length(&_all_requests) << " uncompleted requests"; } - while (!_all_requests.empty()) { + while (!ucs_list_is_empty(&_all_requests)) { ucp_worker_progress(_context.worker()); } @@ -490,18 +492,14 @@ bool UcxConnection::recv_data(void *buffer, size_t length, uint32_t sn, void UcxConnection::cancel_all() { - if (_all_requests.empty()) { + if (ucs_list_is_empty(&_all_requests)) { return; } - ucx_request_list_t requests; - std::copy(_all_requests.begin(), _all_requests.end(), - std::back_insert_iterator(requests)); - - unsigned count = 0; - for (ucx_request_list_t::iterator iter = requests.begin(); - iter != requests.end(); ++iter) { - ucp_request_cancel(_context.worker(), *iter); + ucx_request *request, *tmp; + unsigned count = 0; + ucs_list_for_each_safe(request, tmp, &_all_requests, pos) { + ucp_request_cancel(_context.worker(), request); ++count; } @@ -561,7 +559,12 @@ void UcxConnection::set_log_prefix(const struct sockaddr* saddr, std::stringstream ss; ss << "[UCX-connection #" << _conn_id << " " << UcxContext::sockaddr_str(saddr, addrlen) << "]"; - _log_prefix = ss.str(); + memset(_log_prefix, 0, MAX_LOG_PREFIX_SIZE); + int length = ss.str().length(); + if (length >= MAX_LOG_PREFIX_SIZE) { + length = MAX_LOG_PREFIX_SIZE - 1; + } + memcpy(_log_prefix, ss.str().c_str(), length); } bool UcxConnection::connect_common(ucp_ep_params_t& ep_params) @@ -636,13 +639,13 @@ bool UcxConnection::send_common(const void *buffer, size_t length, ucp_tag_t tag void UcxConnection::request_started(ucx_request *r) { - r->pos = _all_requests.insert(_all_requests.end(), r); + ucs_list_add_tail(&_all_requests, &r->pos); } void UcxConnection::request_completed(ucx_request *r) { assert(r->conn == this); - _all_requests.erase(r->pos); + ucs_list_del(&r->pos); } void UcxConnection::handle_connection_error(ucs_status_t status) diff --git a/test/apps/iodemo/ucx_wrapper.h b/test/apps/iodemo/ucx_wrapper.h index 93d474a640a..78497417d7c 100644 --- a/test/apps/iodemo/ucx_wrapper.h +++ b/test/apps/iodemo/ucx_wrapper.h @@ -14,13 +14,13 @@ #include #include #include -#include +#include +#define MAX_LOG_PREFIX_SIZE 64 /* Forward declarations */ class UcxConnection; struct ucx_request; -typedef std::list ucx_request_list_t; /* * UCX callback for send/receive completion @@ -49,7 +49,7 @@ class EmptyCallback : public UcxCallback { */ class UcxLog { public: - UcxLog(const std::string& prefix, bool enable); + UcxLog(const char* prefix, bool enable); ~UcxLog(); @@ -227,10 +227,10 @@ class UcxConnection { UcxContext& _context; uint32_t _conn_id; uint32_t _remote_conn_id; - std::string _log_prefix; + char _log_prefix[MAX_LOG_PREFIX_SIZE]; ucp_ep_h _ep; void* _close_request; - ucx_request_list_t _all_requests; + ucs_list_link_t _all_requests; }; #endif