Skip to content

Commit

Permalink
JUCX: Do not delete reference to listener connHandler.
Browse files Browse the repository at this point in the history
  • Loading branch information
petro-rudenko committed Apr 20, 2021
1 parent 590848b commit e7b33c7
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 11 deletions.
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Fix Infiniband port speed detection for HDR100
* Fix build issues in gtest-all.cc and sock.c with GCC11
* Fix performance degradation with cuda memory on self endpoint
* Fix bug in JUCX listener connection handler.

## 1.10.0 (March 9, 2021)
### Features:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
public class UcpListener extends UcxNativeStruct implements Closeable {

private InetSocketAddress address;
private UcpListenerConnectionHandler connectionHandler;

public UcpListener(UcpWorker worker, UcpListenerParams params) {
if (params.getSockAddr() == null) {
throw new UcxException("UcpListenerParams.sockAddr must be non-null.");
}
if (params.connectionHandler == null) {
throw new UcxException("Connection handler must be set");
}
this.connectionHandler = params.connectionHandler;
this.address = params.getSockAddr();
setNativeId(createUcpListener(params, worker.getNativeId()));
address = params.getSockAddr();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ public class UcpListenerParams extends UcxParams {
public UcpListenerParams clear() {
super.clear();
sockAddr = null;
connectionHandler = null;
return this;
}

private InetSocketAddress sockAddr;

private UcpListenerConnectionHandler connectionHandler;
UcpListenerConnectionHandler connectionHandler;

/**
* An address, on which {@link UcpListener} would bind.
Expand Down
1 change: 0 additions & 1 deletion bindings/java/src/main/native/jucx_common_def.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ void jucx_connection_handler(ucp_conn_request_h conn_request, void *arg)
jmethodID on_conn_request = env->GetMethodID(jucx_conn_hndl_cls, "onConnectionRequest",
"(Lorg/openucx/jucx/ucp/UcpConnectionRequest;)V");
env->CallVoidMethod(jucx_conn_handler, on_conn_request, jucx_conn_request);
env->DeleteGlobalRef(jucx_conn_handler);
}


Expand Down
2 changes: 1 addition & 1 deletion bindings/java/src/main/native/listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Java_org_openucx_jucx_ucp_UcpListener_createUcpListener(JNIEnv *env, jclass cls,
field = env->GetFieldID(jucx_listener_param_class,
"connectionHandler", "Lorg/openucx/jucx/ucp/UcpListenerConnectionHandler;");
jobject jucx_conn_handler = env->GetObjectField(ucp_listener_params, field);
params.conn_handler.arg = env->NewGlobalRef(jucx_conn_handler);
params.conn_handler.arg = env->NewWeakGlobalRef(jucx_conn_handler);
params.conn_handler.cb = jucx_connection_handler;
}

Expand Down
32 changes: 25 additions & 7 deletions bindings/java/src/test/java/org/openucx/jucx/UcpListenerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,32 @@ public void testConnectionHandler() throws Exception {
UcpEndpoint serverToClient = serverWorker2.newEndpoint(
new UcpEndpointParams().setConnectionRequest(conRequest.get()));

// Temporary workaround until new connection establishment protocol in UCX.
// Test connection handler persists
for (int i = 0; i < 10; i++) {
serverWorker1.progress();
serverWorker2.progress();
clientWorker.progress();
try {
Thread.sleep(10);
} catch (Exception ignored) { }
conRequest.set(null);
UcpEndpoint tmpEp = clientWorker.newEndpoint(new UcpEndpointParams()
.setSocketAddress(listener.getAddress()).setPeerErrorHandlingMode()
.setErrorHandler((ep, status, errorMsg) -> {

}));

while (conRequest.get() == null) {
serverWorker1.progress();
serverWorker2.progress();
clientWorker.progress();
}

UcpEndpoint tmpEp2 = serverWorker2.newEndpoint(
new UcpEndpointParams().setConnectionRequest(conRequest.get()));

UcpRequest close1 = tmpEp.closeNonBlockingFlush();
UcpRequest close2 = tmpEp2.closeNonBlockingFlush();

while (!close1.isCompleted() || !close2.isCompleted()) {
serverWorker1.progress();
serverWorker2.progress();
clientWorker.progress();
}
}

UcpRequest sent = serverToClient.sendStreamNonBlocking(
Expand Down

0 comments on commit e7b33c7

Please sign in to comment.