Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JUCX: Do not delete reference to listener connHandler (v.1.10.x) #6649

Merged
merged 1 commit into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Fix Infiniband port speed detection for HDR100
* Fix build issues in gtest-all.cc and sock.c with GCC11
* Fix performance degradation with cuda memory on self endpoint
* Fix bug in JUCX listener connection handler.

## 1.10.0 (March 9, 2021)
### Features:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
public class UcpListener extends UcxNativeStruct implements Closeable {

private InetSocketAddress address;
private UcpListenerConnectionHandler connectionHandler;

public UcpListener(UcpWorker worker, UcpListenerParams params) {
if (params.getSockAddr() == null) {
throw new UcxException("UcpListenerParams.sockAddr must be non-null.");
}
if (params.connectionHandler == null) {
throw new UcxException("Connection handler must be set");
}
this.connectionHandler = params.connectionHandler;
this.address = params.getSockAddr();
setNativeId(createUcpListener(params, worker.getNativeId()));
address = params.getSockAddr();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ public class UcpListenerParams extends UcxParams {
public UcpListenerParams clear() {
super.clear();
sockAddr = null;
connectionHandler = null;
return this;
}

private InetSocketAddress sockAddr;

private UcpListenerConnectionHandler connectionHandler;
UcpListenerConnectionHandler connectionHandler;

/**
* An address, on which {@link UcpListener} would bind.
Expand Down
1 change: 0 additions & 1 deletion bindings/java/src/main/native/jucx_common_def.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ void jucx_connection_handler(ucp_conn_request_h conn_request, void *arg)
jmethodID on_conn_request = env->GetMethodID(jucx_conn_hndl_cls, "onConnectionRequest",
"(Lorg/openucx/jucx/ucp/UcpConnectionRequest;)V");
env->CallVoidMethod(jucx_conn_handler, on_conn_request, jucx_conn_request);
env->DeleteGlobalRef(jucx_conn_handler);
}


Expand Down
2 changes: 1 addition & 1 deletion bindings/java/src/main/native/listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Java_org_openucx_jucx_ucp_UcpListener_createUcpListener(JNIEnv *env, jclass cls,
field = env->GetFieldID(jucx_listener_param_class,
"connectionHandler", "Lorg/openucx/jucx/ucp/UcpListenerConnectionHandler;");
jobject jucx_conn_handler = env->GetObjectField(ucp_listener_params, field);
params.conn_handler.arg = env->NewGlobalRef(jucx_conn_handler);
params.conn_handler.arg = env->NewWeakGlobalRef(jucx_conn_handler);
params.conn_handler.cb = jucx_connection_handler;
}

Expand Down
32 changes: 25 additions & 7 deletions bindings/java/src/test/java/org/openucx/jucx/UcpListenerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,32 @@ public void testConnectionHandler() throws Exception {
UcpEndpoint serverToClient = serverWorker2.newEndpoint(
new UcpEndpointParams().setConnectionRequest(conRequest.get()));

// Temporary workaround until new connection establishment protocol in UCX.
// Test connection handler persists
for (int i = 0; i < 10; i++) {
serverWorker1.progress();
serverWorker2.progress();
clientWorker.progress();
try {
Thread.sleep(10);
} catch (Exception ignored) { }
conRequest.set(null);
UcpEndpoint tmpEp = clientWorker.newEndpoint(new UcpEndpointParams()
.setSocketAddress(listener.getAddress()).setPeerErrorHandlingMode()
.setErrorHandler((ep, status, errorMsg) -> {

}));

while (conRequest.get() == null) {
serverWorker1.progress();
serverWorker2.progress();
clientWorker.progress();
}

UcpEndpoint tmpEp2 = serverWorker2.newEndpoint(
new UcpEndpointParams().setConnectionRequest(conRequest.get()));

UcpRequest close1 = tmpEp.closeNonBlockingFlush();
UcpRequest close2 = tmpEp2.closeNonBlockingFlush();

while (!close1.isCompleted() || !close2.isCompleted()) {
serverWorker1.progress();
serverWorker2.progress();
clientWorker.progress();
}
}

UcpRequest sent = serverToClient.sendStreamNonBlocking(
Expand Down