diff --git a/test/apps/iodemo/io_demo.cc b/test/apps/iodemo/io_demo.cc index d82460d7cd3d..fc53fad8cc09 100644 --- a/test/apps/iodemo/io_demo.cc +++ b/test/apps/iodemo/io_demo.cc @@ -60,6 +60,7 @@ typedef struct { size_t chunk_size; long iter_count; long window_size; + long conn_window_size; std::vector operations; unsigned random_seed; size_t num_offcache_buffers; @@ -855,7 +856,8 @@ class DemoClient : public P2pDemoCommon { public: IoReadResponseCallback(size_t buffer_size, MemoryPool& pool) : - _comp_counter(0), _io_counter(NULL), _server_io_counter(NULL), + _comp_counter(0), _client(NULL), + _server_index(std::numeric_limits::max()), _sn(0), _validate(false), _iov(NULL), _buffer(malloc(buffer_size)), _buffer_size(buffer_size), _meta_comp_counter(0), _pool(pool) { @@ -864,13 +866,13 @@ class DemoClient : public P2pDemoCommon { } } - void init(long *io_counter, long *conn_io_counter, + void init(DemoClient *client, size_t server_index, uint32_t sn, bool validate, BufferIov *iov, int meta_comp_counter = 1) { /* wait for all data chunks and the read response completion */ _comp_counter = iov->size() + meta_comp_counter; - _io_counter = io_counter; - _server_io_counter = conn_io_counter; + _client = client; + _server_index = server_index; _sn = sn; _validate = validate; _iov = iov; @@ -886,8 +888,9 @@ class DemoClient : public P2pDemoCommon { return; } - ++(*_io_counter); - ++(*_server_io_counter); + assert(_server_index != std::numeric_limits::max()); + _client->handle_operation_completion(_server_index, IO_READ); + if (_validate && (status == UCS_OK)) { validate(*_iov, _sn); @@ -912,8 +915,8 @@ class DemoClient : public P2pDemoCommon { private: long _comp_counter; - long* _io_counter; - long* _server_io_counter; + DemoClient* _client; + size_t _server_index; uint32_t _sn; bool _validate; BufferIov* _iov; @@ -924,8 +927,9 @@ class DemoClient : public P2pDemoCommon { }; DemoClient(const options_t& test_opts) : - P2pDemoCommon(test_opts), _prev_connect_time(0), - _num_sent(0), _num_completed(0), + P2pDemoCommon(test_opts), + _num_active_servers_to_use(0), + _prev_connect_time(0), _num_sent(0), _num_completed(0), _status(OK), _start_time(get_time()), _read_callback_pool(opts().iomsg_size, "read callbacks") { } @@ -949,24 +953,48 @@ class DemoClient : public P2pDemoCommon { i->second; } - size_t do_io_read(server_info_t& server_info, uint32_t sn) { - size_t data_size = get_data_size(); - bool validate = opts().validate; + void commit_operation(size_t server_index) { + server_info_t& server_info = _server_info[server_index]; + + assert(get_num_uncompleted(server_info) < opts().conn_window_size); + + ++server_info.num_sent; + ++_num_sent; + if (get_num_uncompleted(server_info) == opts().conn_window_size) { + active_servers_make_unused(server_info.active_index); + } + } + + void handle_operation_completion(size_t server_index, io_op_t op) { + server_info_t& server_info = _server_info[server_index]; + + assert(get_num_uncompleted(server_info) <= opts().conn_window_size); + + if (get_num_uncompleted(server_info) == opts().conn_window_size) { + active_servers_make_used(server_info.active_index); + } + + ++_num_completed; + ++server_info.num_completed[op]; + } + + size_t do_io_read(size_t server_index, uint32_t sn) { + server_info_t& server_info = _server_info[server_index]; + size_t data_size = get_data_size(); + bool validate = opts().validate; if (!send_io_message(server_info.conn, IO_READ, sn, data_size, validate)) { return 0; } - ++server_info.num_sent; - ++_num_sent; - - BufferIov *iov = _data_buffers_pool.get(); - IoReadResponseCallback *r = _read_callback_pool.get(); + commit_operation(server_index); + BufferIov *iov = _data_buffers_pool.get(); iov->init(data_size, _data_chunks_pool, sn, validate); - r->init(&_num_completed, &server_info.num_completed[IO_READ], sn, - validate, iov); + + IoReadResponseCallback *r = _read_callback_pool.get(); + r->init(this, server_index, sn, validate, iov); recv_data(server_info.conn, *iov, sn, r); server_info.conn->recv_data(r->buffer(), opts().iomsg_size, sn, r); @@ -974,54 +1002,55 @@ class DemoClient : public P2pDemoCommon { return data_size; } - size_t do_io_read_am(server_info_t& server_info, uint32_t sn) { - size_t data_size = get_data_size(); + size_t do_io_read_am(size_t server_index, uint32_t sn) { + server_info_t& server_info = _server_info[server_index]; + size_t data_size = get_data_size(); + + commit_operation(server_index); IoMessage *m = _io_msg_pool.get(); m->init(IO_READ, sn, data_size, opts().validate); server_info.conn->send_am(m->buffer(), opts().iomsg_size, NULL, 0, m); - ++server_info.num_sent; - ++_num_sent; - return data_size; } - size_t do_io_write(server_info_t& server_info, uint32_t sn) { - size_t data_size = get_data_size(); - bool validate = opts().validate; + size_t do_io_write(size_t server_index, uint32_t sn) { + server_info_t& server_info = _server_info[server_index]; + size_t data_size = get_data_size(); + bool validate = opts().validate; if (!send_io_message(server_info.conn, IO_WRITE, sn, data_size, validate)) { return 0; } - ++server_info.num_sent; - ++_num_sent; - - BufferIov *iov = _data_buffers_pool.get(); - SendCompleteCallback *cb = _send_callback_pool.get(); + commit_operation(server_index); + BufferIov *iov = _data_buffers_pool.get(); iov->init(data_size, _data_chunks_pool, sn, validate); + + SendCompleteCallback *cb = _send_callback_pool.get(); cb->init(iov, NULL); VERBOSE_LOG << "sending data " << iov << " size " << data_size << " sn " << sn; send_data(server_info.conn, *iov, sn, cb); + return data_size; } - size_t do_io_write_am(server_info_t& server_info, uint32_t sn) { - size_t data_size = get_data_size(); - bool validate = opts().validate; + size_t do_io_write_am(size_t server_index, uint32_t sn) { + server_info_t& server_info = _server_info[server_index]; + size_t data_size = get_data_size(); + bool validate = opts().validate; + + commit_operation(server_index); IoMessage *m = _io_msg_pool.get(); m->init(IO_WRITE, sn, data_size, validate); - ++server_info.num_sent; - ++_num_sent; - BufferIov *iov = _data_buffers_pool.get(); iov->init(data_size, _data_chunks_pool, sn, validate); @@ -1070,8 +1099,7 @@ class DemoClient : public P2pDemoCommon { size_t server_index = get_server_index(conn); if (server_index < _server_info.size()) { - ++_num_completed; - ++_server_info[server_index].num_completed[IO_WRITE]; + handle_operation_completion(server_index, IO_WRITE); } else { /* do not increment _num_completed here since we decremented * _num_sent on connection termination */ @@ -1097,18 +1125,16 @@ class DemoClient : public P2pDemoCommon { } // Client can receive IO_WRITE_COMP or IO_READ_COMP only + size_t server_index = get_server_index(conn); if (msg->op == IO_WRITE_COMP) { assert(msg->op == IO_WRITE_COMP); - ++_num_completed; - ++_server_info[get_server_index(conn)].num_completed[IO_WRITE]; + handle_operation_completion(server_index, IO_WRITE); } else if (msg->op == IO_READ_COMP) { - BufferIov *iov = _data_buffers_pool.get(); - IoReadResponseCallback *r = _read_callback_pool.get(); - + BufferIov *iov = _data_buffers_pool.get(); iov->init(msg->data_size, _data_chunks_pool, msg->sn, opts().validate); - r->init(&_num_completed, - &_server_info[get_server_index(conn)].num_completed[IO_READ], - msg->sn, opts().validate, iov, 0); + + IoReadResponseCallback *r = _read_callback_pool.get(); + r->init(this, server_index, msg->sn, opts().validate, iov, 0); assert(iov->size() == 1); @@ -1116,10 +1142,14 @@ class DemoClient : public P2pDemoCommon { } } + long get_num_uncompleted(const server_info_t& server_info) const { + return server_info.num_sent - + (server_info.num_completed[IO_READ] + + server_info.num_completed[IO_WRITE]); + } + long get_num_uncompleted(size_t server_index) const { - return _server_info[server_index].num_sent - - (_server_info[server_index].num_completed[IO_READ] + - _server_info[server_index].num_completed[IO_WRITE]); + return get_num_uncompleted(_server_info[server_index]); } static void reset_server_info(server_info_t& server_info) { @@ -1151,20 +1181,9 @@ class DemoClient : public P2pDemoCommon { delete server_info.conn; // Don't wait for any more completions on this connection - _num_sent -= get_num_uncompleted(server_index); + _num_sent -= get_num_uncompleted(server_info); - // Replace in _active_servers by the last element in the vector - size_t active_index = server_info.active_index; - std::swap(_active_servers[active_index], _active_servers.back()); - assert(_active_servers.back() == server_index); - - // Swap the active_index field with the "replacement" server_info - server_info_t& replacement_server_info = - _server_info[_active_servers[active_index]]; - std::swap(replacement_server_info.active_index, server_info.active_index); - assert(server_info.active_index == _active_servers.size() - 1); - - _active_servers.pop_back(); + active_servers_remove(server_info.active_index); reset_server_info(server_info); } @@ -1299,11 +1318,9 @@ class DemoClient : public P2pDemoCommon { continue; } - server_info.retry_count = 0; + server_info.retry_count = 0; _server_index_lookup[server_info.conn] = server_index; - - server_info.active_index = _active_servers.size(); - _active_servers.push_back(server_index); + active_servers_add(server_index); LOG << "Connected to " << server_name(server_index); } @@ -1311,6 +1328,20 @@ class DemoClient : public P2pDemoCommon { _prev_connect_time = curr_time; } + size_t pick_server_index() const { + assert(_num_active_servers_to_use != 0); + + /* Pick a random connected server to which the client has credits + * to send (its conn's window is not full) */ + size_t active_index = IoDemoRandom::rand(size_t(0), + _num_active_servers_to_use - 1); + size_t server_index = _active_servers[active_index]; + assert(get_num_uncompleted(server_index) < opts().conn_window_size); + assert(_server_info[server_index].conn != NULL); + + return server_index; + } + static inline bool is_control_iter(long iter) { return (iter % 10) == 0; } @@ -1333,13 +1364,6 @@ class DemoClient : public P2pDemoCommon { op_info_t op_info[IO_OP_MAX] = {{0,0}}; while ((total_iter < opts().iter_count) && (_status == OK)) { - VERBOSE_LOG << " <<<< iteration " << total_iter << " >>>>"; - - wait_for_responses(opts().window_size - 1); - if (_status != OK) { - break; - } - connect_all(is_control_iter(total_iter)); if (_status != OK) { break; @@ -1353,27 +1377,40 @@ class DemoClient : public P2pDemoCommon { continue; } - /* Pick random connected server */ - size_t active_index = IoDemoRandom::rand(size_t(0), - _active_servers.size() - 1); - size_t server_index = _active_servers[active_index]; - assert(_server_info[server_index].conn != NULL); + VERBOSE_LOG << " <<<< iteration " << total_iter << " >>>>"; + long conns_window_size = opts().conn_window_size * + _active_servers.size(); + long max_outstanding = std::min(opts().window_size, + conns_window_size) - 1; + wait_for_responses(max_outstanding); + if (_status != OK) { + break; + } + + if (_num_active_servers_to_use == 0) { + // It is possible that the number of active servers to use is 0 + // after wait_for_responses(), if some clients were closed in + // UCP Worker progress during handling of remote disconnection + // from servers + continue; + } - io_op_t op = get_op(); + size_t server_index = pick_server_index(); + io_op_t op = get_op(); size_t size; switch (op) { case IO_READ: if (opts().use_am) { - size = do_io_read_am(_server_info[server_index], sn); + size = do_io_read_am(server_index, sn); } else { - size = do_io_read(_server_info[server_index], sn); + size = do_io_read(server_index, sn); } break; case IO_WRITE: if (opts().use_am) { - size = do_io_write_am(_server_info[server_index], sn); + size = do_io_write_am(server_index, sn); } else { - size = do_io_write(_server_info[server_index], sn); + size = do_io_write(server_index, sn); } break; default: @@ -1488,8 +1525,8 @@ class DemoClient : public P2pDemoCommon { for (size_t server_index = 0; server_index < _server_info.size(); ++server_index) { server_info_t& server_info = _server_info[server_index]; - long delta_completed = server_info.num_completed[op_id] - - server_info.prev_completed[op_id]; + long delta_completed = server_info.num_completed[op_id] - + server_info.prev_completed[op_id]; if ((delta_completed < delta_min) || ((delta_completed == delta_min) && (server_info.retry_count > @@ -1527,9 +1564,53 @@ class DemoClient : public P2pDemoCommon { } } + void active_servers_swap(size_t index1, size_t index2) { + size_t& active_server1 = _active_servers[index1]; + size_t& active_server2 = _active_servers[index2]; + + std::swap(_server_info[active_server1].active_index, + _server_info[active_server2].active_index); + std::swap(active_server1, active_server2); + } + + void active_servers_add(size_t server_index) { + assert(_num_active_servers_to_use <= _active_servers.size()); + + _active_servers.push_back(server_index); + _server_info[server_index].active_index = _active_servers.size() - 1; + active_servers_make_used(_server_info[server_index].active_index); + assert(_num_active_servers_to_use <= _active_servers.size()); + } + + void active_servers_remove(size_t active_index) { + assert(active_index < _active_servers.size()); + + if (active_index < _num_active_servers_to_use) { + active_servers_make_unused(active_index); + active_index = _num_active_servers_to_use; + } + + assert(active_index >= _num_active_servers_to_use); + active_servers_swap(active_index, _active_servers.size() - 1); + _active_servers.pop_back(); + } + + void active_servers_make_unused(size_t active_index) { + assert(active_index < _num_active_servers_to_use); + --_num_active_servers_to_use; + active_servers_swap(active_index, _num_active_servers_to_use); + } + + void active_servers_make_used(size_t active_index) { + assert(active_index >= _num_active_servers_to_use); + active_servers_swap(active_index, _num_active_servers_to_use); + ++_num_active_servers_to_use; + } + private: std::vector _server_info; std::vector _active_servers; + size_t _num_active_servers_to_use; std::map _server_index_lookup; double _prev_connect_time; long _num_sent; @@ -1626,6 +1707,21 @@ static void adjust_opts(options_t *test_opts) { test_opts->max_data_size); } +static int parse_window_size(const char *optarg, long &window_size, + const std::string &window_size_str) { + window_size = strtol(optarg, NULL, 0); + if ((window_size <= 0) || + // If the converted value falls out of range of corresponding + // return type, LONG_MAX is returned + (window_size == std::numeric_limits::max())) { + std::cout << "invalid " << window_size_str << " size '" << optarg + << "'" << std::endl; + return -1; + } + + return 0; +} + static int parse_args(int argc, char **argv, options_t *test_opts) { char *str; @@ -1646,12 +1742,13 @@ static int parse_args(int argc, char **argv, options_t *test_opts) test_opts->iomsg_size = 256; test_opts->iter_count = 1000; test_opts->window_size = 1; + test_opts->conn_window_size = 1; test_opts->random_seed = std::time(NULL); test_opts->verbose = false; test_opts->validate = false; test_opts->use_am = false; - while ((c = getopt(argc, argv, "p:c:r:d:b:i:w:k:o:t:n:l:s:y:vqaHP:")) != -1) { + while ((c = getopt(argc, argv, "p:c:r:d:b:i:w:a:k:o:t:n:l:s:y:vqAHP:")) != -1) { switch (c) { case 'p': test_opts->port_num = atoi(optarg); @@ -1692,7 +1789,16 @@ static int parse_args(int argc, char **argv, options_t *test_opts) } break; case 'w': - test_opts->window_size = atoi(optarg); + if (parse_window_size(optarg, test_opts->window_size, + "window") != 0) { + return -1; + } + break; + case 'a': + if (parse_window_size(optarg, test_opts->conn_window_size, + "per connection window") != 0) { + return -1; + } break; case 'k': test_opts->chunk_size = strtol(optarg, NULL, 0); @@ -1754,7 +1860,7 @@ static int parse_args(int argc, char **argv, options_t *test_opts) case 'q': test_opts->validate = true; break; - case 'a': + case 'A': test_opts->use_am = true; break; case 'H': @@ -1779,6 +1885,7 @@ static int parse_args(int argc, char **argv, options_t *test_opts) std::cout << " -b Number of offcache IO buffers" << std::endl; std::cout << " -i Number of iterations to run communication" << std::endl; std::cout << " -w Number of outstanding requests" << std::endl; + std::cout << " -a Number of outstanding requests per connection" << std::endl; std::cout << " -k Split the data transfer to chunks of this size" << std::endl; std::cout << " -r Size of IO request packet" << std::endl; std::cout << " -t Client timeout (or \"inf\")" << std::endl;