-
Notifications
You must be signed in to change notification settings - Fork 423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IODEMO: non blocking connection establishment #6338
IODEMO: non blocking connection establishment #6338
Conversation
test/apps/iodemo/io_demo.cc
Outdated
_client.active_servers_add(_server_idx); | ||
LOG << "Connected to " << _client.server_name(_server_idx); | ||
} else { | ||
/* connection should be deleted be upper layer */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/be upper/on upper
@gleon99 could you please review? |
bot:pipe:retest |
LOG << "Connected to " << server_name(server_index); | ||
connect(server_index); | ||
assert(server_info.conn != NULL); | ||
assert(_status == OK); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance _status
can not be modified by a call to connect()
, thus there's no point in checking it (after line 1359 check)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO there is a point until Connection
implementation can affect UcxContext
state
I replaced if
condition with assert
because connect
is changed this way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see where connect() can modify _status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
via callback
@yosefe , can review please? |
test/apps/iodemo/io_demo.cc
Outdated
|
||
private: | ||
DemoClient &_client; | ||
size_t _server_idx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
test/apps/iodemo/io_demo.cc
Outdated
@@ -1300,7 +1348,7 @@ class DemoClient : public P2pDemoCommon { | |||
++server_index) { | |||
server_info_t& server_info = _server_info[server_index]; | |||
if (server_info.conn != NULL) { | |||
// Server is already connected | |||
// Server is already connected or in progress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already connecting to this server
test/apps/iodemo/ucx_wrapper.cc
Outdated
_connection.handle_connection_error(_status); | ||
} | ||
|
||
established(_status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo better call just one callback in case of error (either established of handle_connection_error) but not both
test/apps/iodemo/ucx_wrapper.cc
Outdated
_send_id_req = send_id_req; | ||
_recv_id_req = recv_id_req; | ||
_comp_count = 2; // both send and recv requests should be completed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we need to manage requests inside UcxConnection class.
and remove UcxEstablishCallback
instead, UcxConnection will hold a pointer to UcxCallback to call when the connection is established
test/apps/iodemo/ucx_wrapper.h
Outdated
@@ -73,6 +104,16 @@ class UcxLog { | |||
* Holds UCX global context and worker | |||
*/ | |||
class UcxContext { | |||
friend class UcxAcceptCallback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid access to private methods?
test/apps/iodemo/ucx_wrapper.h
Outdated
static unsigned _num_instances; | ||
|
||
UcxContext &_context; | ||
UcxEstablishCallback *_establish_cb; | ||
uint64_t _conn_id; | ||
uint64_t _remote_conn_id; | ||
char _log_prefix[MAX_LOG_PREFIX_SIZE]; | ||
ucp_ep_h _ep; | ||
void *_close_request; | ||
ucs_list_link_t _all_requests; | ||
ucs_status_t _ucx_status; | ||
bool _use_am; | ||
bool _connected; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls revert
test/apps/iodemo/ucx_wrapper.h
Outdated
ucs_status_ptr_t _send_id_req; | ||
ucs_status_ptr_t _recv_id_req; | ||
unsigned _comp_count; | ||
ucs_status_t _status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align on column
if (status == UCS_OK) { | ||
_context.dispatch_connection_accepted(&_connection); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have some cleanup if it failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the conn will go thru common failed conn list (by ep err handler), the calback will be deleted outside in stream_recv_cb
|
||
if (_connected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removed this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the failed connection goes thru common failed list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we avoid calling user callback if _connected is false?
if not, why not, since it was the case before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before this PR we handled failed connection before and after "connected" differently.
now _connected
is needed only to handle active servers list on the app level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void UcxContext::progress_failed_connections()
{
while (!_failed_conns.empty()) {
UcxConnection *conn = _failed_conns.front();
_failed_conns.pop_front();
if (conn->is_established) {
dispatch_connection_error(conn);
} else {
conn->callback(error);
}
}
}
test/apps/iodemo/ucx_wrapper.h
Outdated
double _connect_timeout; | ||
bool _use_am; | ||
typedef std::map<uint64_t, UcxConnection*> conn_map_t; | ||
typedef std::list<std::pair<double, UcxConnection*> > timeout_conn_t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
many std::vector? to avoid malloc on every elem
|
||
if (_connected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we avoid calling user callback if _connected is false?
if not, why not, since it was the case before this PR?
@@ -748,6 +750,8 @@ void UcxConnection::stream_recv_callback(void *request, ucs_status_t status, | |||
ucx_request *r = reinterpret_cast<ucx_request*>(request); | |||
UcxConnection *conn = r->conn; | |||
|
|||
// need to check connection status first since it's already can be failed or | |||
// disconnecting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check _connected field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, because it's always false here, conn->established();
will be called at line 757
test/apps/iodemo/ucx_wrapper.cc
Outdated
// send local connection id | ||
void *sreq = ucp_stream_send_nb(_ep, &_conn_id, 1, dt_int, | ||
stream_send_callback, 0); | ||
// we can don't check status here, in case if the endpoint is failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not have to check the status here, ...
} | ||
|
||
process_request("conn_id receive", rreq, callback); | ||
if (UCS_PTR_IS_PTR(rreq)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (UCS_PTR_IS_ERR(rreq)) {
return;
}
then can remove line 853
test/apps/iodemo/ucx_wrapper.cc
Outdated
_conn_id = reinterpret_cast<uint64_t>(_ep); | ||
established(); | ||
process_request("connect AM", NULL, callback); | ||
delete callback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIR in case of sen operation, the callback deletes itself. so in conn callback case, should be the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connection keeps poiter to the callback, I would like to avoid freed poiter there, so IMO better to delete and NULL'ize the poiter in the same place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after callback is called, set conn->callback = NULL, user should take care of its memory release
test/apps/iodemo/ucx_wrapper.cc
Outdated
_conn_id = reinterpret_cast<uint64_t>(_ep); | ||
established(); | ||
process_request("connect AM", NULL, callback); | ||
delete callback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after callback is called, set conn->callback = NULL, user should take care of its memory release
|
||
if (_connected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void UcxContext::progress_failed_connections()
{
while (!_failed_conns.empty()) {
UcxConnection *conn = _failed_conns.front();
_failed_conns.pop_front();
if (conn->is_established) {
dispatch_connection_error(conn);
} else {
conn->callback(error);
}
}
}
test/apps/iodemo/ucx_wrapper.h
Outdated
@@ -224,6 +243,12 @@ class UcxConnection { | |||
return _ucx_status; | |||
} | |||
|
|||
bool is_established() const { | |||
return _connected; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- return _callback == NULL
- remove _connected
test/apps/iodemo/ucx_wrapper.cc
Outdated
assert(conn->_establish_cb == r->callback); | ||
common_request_callback(request, status); | ||
delete conn->_establish_cb; | ||
conn->_establish_cb = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe keep conn->_establish_cb = NULL;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it to established
and handle_connection_error
where the callback is invoked
return; | ||
} | ||
|
||
UCX_CONN_LOG << "detected error: " << ucs_status_string(status); | ||
_ucx_status = status; | ||
|
||
/* the upper layer should close the connection */ | ||
_context.handle_connection_error(this); | ||
if (is_established()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this some duplication of d71ac19#diff-d3eefc2ff6f38844ae09889a04bb916b5433731d5780f8a10bd9d6bf7862ab41R370?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no but I agree - naming looks confusing...
test/apps/iodemo/ucx_wrapper.cc
Outdated
void UcxContext::dispatch_connection_accepted(UcxConnection* conn) | ||
{ | ||
} | ||
|
||
void UcxContext::handle_connection_error(UcxConnection *conn) | ||
{ | ||
remove_connection_inprogress(conn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do it only if not established?
test/apps/iodemo/ucx_wrapper.cc
Outdated
if (conn->is_established()) { | ||
dispatch_connection_error(conn); | ||
} else { | ||
conn->handle_connection_error(conn->ucx_status()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should call a helper function here, and call this helper also from established():
PS. it's a bit confusing that "establish_cb" is both for success and error, while "established()" is called only in case of success
UcxConnection::connect_completed()
{
_context.remove_connection_inprogress(this);
_context.add_connection(this);
(*_establish_cb)(_status);
_establish_cb = NULL;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think it's much better. The issue here that we have to handle failed connection differently outside depends on it's internal state. Maybe we should re-think how to advance connection state for simplicity - what should be done by app, context and connection itself
4757a73
to
17f8007
Compare
} else { | ||
established(UCS_PTR_STATUS(rreq)); | ||
if (rreq != NULL) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls add comment /* failed to receive */
test/apps/iodemo/ucx_wrapper.cc
Outdated
if (_connected) { | ||
/* the upper layer should close the connection */ | ||
/* the upper layer should close the connection */ | ||
if (_establish_cb != NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use is_established() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls squash
d2bbaf1
to
9901186
Compare
What
implement non-blocking connection establishment
Why ?
to improve scalability