From 111a128e90b4de281e8950be63d178cebdfe2f57 Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Wed, 18 Nov 2020 15:33:23 +0200 Subject: [PATCH 1/2] JUCX: add exceptions to be thrown by progress and errorHandler. --- .../examples/UcxReadBWBenchmarkReceiver.java | 14 ------ .../examples/UcxReadBWBenchmarkSender.java | 47 +++++++++-------- .../jucx/ucp/UcpEndpointErrorHandler.java | 2 +- .../java/org/openucx/jucx/ucp/UcpWorker.java | 6 +-- .../org/openucx/jucx/ucs/UcsConstants.java | 50 +++++++++++++++++++ .../java/src/main/native/ucs_constants.cc | 42 +++++++++++++++- .../org/openucx/jucx/UcpEndpointTest.java | 25 ++++++---- .../org/openucx/jucx/UcpListenerTest.java | 14 ++++-- .../java/org/openucx/jucx/UcpRequestTest.java | 2 +- .../java/org/openucx/jucx/UcpWorkerTest.java | 14 ++++-- 10 files changed, 157 insertions(+), 59 deletions(-) diff --git a/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkReceiver.java b/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkReceiver.java index cc1b79c9558..733835d3e2c 100644 --- a/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkReceiver.java +++ b/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkReceiver.java @@ -42,14 +42,6 @@ public static void main(String[] args) throws Exception { .setConnectionRequest(connRequest.get()) .setPeerErrorHandlingMode()); - // Temporary workaround until new connection establishment protocol in UCX. - for (int i = 0; i < 10; i++) { - worker.progress(); - try { - Thread.sleep(10); - } catch (Exception ignored) { } - } - ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096); UcpRequest recvRequest = worker.recvTaggedNonBlocking(recvBuffer, null); @@ -95,12 +87,6 @@ public void onSuccess(UcpRequest request) { data.put(0, (byte)1); } - ByteBuffer sendBuffer = ByteBuffer.allocateDirect(100); - sendBuffer.asCharBuffer().put("DONE"); - - UcpRequest sent = endpoint.sendTaggedNonBlocking(sendBuffer, null); - worker.progressRequest(sent); - UcpRequest closeRequest = endpoint.closeNonBlockingFlush(); worker.progressRequest(closeRequest); // Close request won't be return to pull automatically, since there's no callback. diff --git a/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkSender.java b/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkSender.java index 9c60206c11a..dec34891d42 100644 --- a/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkSender.java +++ b/bindings/java/src/main/java/org/openucx/jucx/examples/UcxReadBWBenchmarkSender.java @@ -5,13 +5,12 @@ package org.openucx.jucx.examples; -import org.openucx.jucx.UcxCallback; -import org.openucx.jucx.ucp.UcpRequest; +import org.openucx.jucx.UcxException; +import org.openucx.jucx.ucp.*; import org.openucx.jucx.UcxUtils; -import org.openucx.jucx.ucp.UcpEndpoint; -import org.openucx.jucx.ucp.UcpEndpointParams; -import org.openucx.jucx.ucp.UcpMemory; +import org.openucx.jucx.ucs.UcsConstants; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -28,6 +27,13 @@ public static void main(String[] args) throws Exception { String serverHost = argsMap.get("s"); UcpEndpoint endpoint = worker.newEndpoint(new UcpEndpointParams() .setPeerErrorHandlingMode() + .setErrorHandler((ep, status, errorMsg) -> { + if (status == UcsConstants.STATUS.UCS_ERR_CONNECTION_RESET) { + throw new ConnectException(errorMsg); + } else { + throw new UcxException(errorMsg); + } + }) .setSocketAddress(new InetSocketAddress(serverHost, serverPort))); UcpMemory memory = context.memoryMap(allocationParams); @@ -49,22 +55,21 @@ public static void main(String[] args) throws Exception { // Send memory metadata and wait until receiver will finish benchmark. endpoint.sendTaggedNonBlocking(sendData, null); - ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096); - UcpRequest recvRequest = worker.recvTaggedNonBlocking(recvBuffer, - new UcxCallback() { - @Override - public void onSuccess(UcpRequest request) { - System.out.println("Received a message:"); - System.out.println(recvBuffer.asCharBuffer().toString().trim()); - } - }); - - worker.progressRequest(recvRequest); - UcpRequest closeRequest = endpoint.closeNonBlockingFlush(); - worker.progressRequest(closeRequest); - resources.push(closeRequest); - - closeResources(); + try { + while (true) { + if (worker.progress() == 0) { + worker.waitForEvents(); + } + } + } catch (ConnectException ignored) { + } catch (Exception ex) { + System.err.println(ex.getMessage()); + } finally { + UcpRequest closeRequest = endpoint.closeNonBlockingForce(); + worker.progressRequest(closeRequest); + resources.push(closeRequest); + closeResources(); + } } } diff --git a/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpEndpointErrorHandler.java b/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpEndpointErrorHandler.java index 855e5ef5f46..e53e24d6642 100755 --- a/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpEndpointErrorHandler.java +++ b/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpEndpointErrorHandler.java @@ -15,5 +15,5 @@ public interface UcpEndpointErrorHandler { * all subsequent operations on this ep will fail with * the error code passed in {@code status}. */ - void onError(UcpEndpoint ep, int status, String errorMsg); + void onError(UcpEndpoint ep, int status, String errorMsg) throws Exception; } diff --git a/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpWorker.java b/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpWorker.java index 8da85f2e4d2..7848246bda7 100644 --- a/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpWorker.java +++ b/bindings/java/src/main/java/org/openucx/jucx/ucp/UcpWorker.java @@ -59,14 +59,14 @@ public void close() { * This routine explicitly progresses all communication operations on a worker. * @return Non-zero if any communication was progressed, zero otherwise. */ - public int progress() { + public int progress() throws Exception { return progressWorkerNative(getNativeId()); } /** * Blocking progress for request until it's not completed. */ - public void progressRequest(UcpRequest request) { + public void progressRequest(UcpRequest request) throws Exception { while (!request.isCompleted()) { progress(); } @@ -251,7 +251,7 @@ public ByteBuffer getAddress() { private static native void releaseAddressNative(long workerId, ByteBuffer addressId); - private static native int progressWorkerNative(long workerId); + private static native int progressWorkerNative(long workerId) throws Exception; private static native UcpRequest flushNonBlockingNative(long workerId, UcxCallback callback); diff --git a/bindings/java/src/main/java/org/openucx/jucx/ucs/UcsConstants.java b/bindings/java/src/main/java/org/openucx/jucx/ucs/UcsConstants.java index b22f0b1da60..e775a3b4daf 100644 --- a/bindings/java/src/main/java/org/openucx/jucx/ucs/UcsConstants.java +++ b/bindings/java/src/main/java/org/openucx/jucx/ucs/UcsConstants.java @@ -22,6 +22,56 @@ public static class ThreadMode { public static int UCS_THREAD_MODE_MULTI; } + /** + * Status codes + */ + public static class STATUS { + static { + load(); + } + + /* Operation completed successfully */ + public static int UCS_OK; + + /* Operation is queued and still in progress */ + public static int UCS_INPROGRESS; + + /* Failure codes */ + public static int UCS_ERR_NO_MESSAGE; + public static int UCS_ERR_NO_RESOURCE; + public static int UCS_ERR_IO_ERROR; + public static int UCS_ERR_NO_MEMORY; + public static int UCS_ERR_INVALID_PARAM; + public static int UCS_ERR_UNREACHABLE; + public static int UCS_ERR_INVALID_ADDR; + public static int UCS_ERR_NOT_IMPLEMENTED; + public static int UCS_ERR_MESSAGE_TRUNCATED; + public static int UCS_ERR_NO_PROGRESS; + public static int UCS_ERR_BUFFER_TOO_SMALL; + public static int UCS_ERR_NO_ELEM; + public static int UCS_ERR_SOME_CONNECTS_FAILED; + public static int UCS_ERR_NO_DEVICE; + public static int UCS_ERR_BUSY; + public static int UCS_ERR_CANCELED; + public static int UCS_ERR_SHMEM_SEGMENT; + public static int UCS_ERR_ALREADY_EXISTS; + public static int UCS_ERR_OUT_OF_RANGE; + public static int UCS_ERR_TIMED_OUT; + public static int UCS_ERR_EXCEEDS_LIMIT; + public static int UCS_ERR_UNSUPPORTED; + public static int UCS_ERR_REJECTED; + public static int UCS_ERR_NOT_CONNECTED; + public static int UCS_ERR_CONNECTION_RESET; + + public static int UCS_ERR_FIRST_LINK_FAILURE; + public static int UCS_ERR_LAST_LINK_FAILURE; + public static int UCS_ERR_FIRST_ENDPOINT_FAILURE; + public static int UCS_ERR_ENDPOINT_TIMEOUT; + public static int UCS_ERR_LAST_ENDPOINT_FAILURE; + + public static int UCS_ERR_LAST; + } + private static void load() { NativeLibs.load(); loadConstants(); diff --git a/bindings/java/src/main/native/ucs_constants.cc b/bindings/java/src/main/native/ucs_constants.cc index 28507b05c66..f255c3a04d7 100644 --- a/bindings/java/src/main/native/ucs_constants.cc +++ b/bindings/java/src/main/native/ucs_constants.cc @@ -9,9 +9,49 @@ #include JNIEXPORT void JNICALL -Java_org_openucx_jucx_ucs_UcsConstants_loadConstants(JNIEnv *env, jclass cls) +Java_org_openucx_jucx_ucs_UcsConstants_loadConstants(JNIEnv *env, jclass ucs_class) { jclass thread_mode = env->FindClass("org/openucx/jucx/ucs/UcsConstants$ThreadMode"); jfieldID field = env->GetStaticFieldID(thread_mode, "UCS_THREAD_MODE_MULTI", "I"); env->SetStaticIntField(thread_mode, field, UCS_THREAD_MODE_MULTI); + + jclass cls = env->FindClass("org/openucx/jucx/ucs/UcsConstants$STATUS"); + + /* Operation completed successfully */ + JUCX_DEFINE_INT_CONSTANT(UCS_OK); + + /* Operation is queued and stil in progress */ + JUCX_DEFINE_INT_CONSTANT(UCS_INPROGRESS); + /* Failure codes */ + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NO_MESSAGE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NO_RESOURCE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_IO_ERROR); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NO_MEMORY); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_INVALID_PARAM); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_UNREACHABLE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_INVALID_ADDR); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NOT_IMPLEMENTED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_MESSAGE_TRUNCATED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NO_PROGRESS); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_BUFFER_TOO_SMALL); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NO_ELEM); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_SOME_CONNECTS_FAILED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NO_DEVICE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_BUSY); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_CANCELED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_SHMEM_SEGMENT); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_ALREADY_EXISTS); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_OUT_OF_RANGE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_TIMED_OUT); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_EXCEEDS_LIMIT); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_UNSUPPORTED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_REJECTED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_NOT_CONNECTED); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_CONNECTION_RESET); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_FIRST_LINK_FAILURE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_LAST_LINK_FAILURE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_FIRST_ENDPOINT_FAILURE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_ENDPOINT_TIMEOUT); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_LAST_ENDPOINT_FAILURE); + JUCX_DEFINE_INT_CONSTANT(UCS_ERR_LAST); } diff --git a/bindings/java/src/test/java/org/openucx/jucx/UcpEndpointTest.java b/bindings/java/src/test/java/org/openucx/jucx/UcpEndpointTest.java index ea5ffec6188..7af33927e20 100644 --- a/bindings/java/src/test/java/org/openucx/jucx/UcpEndpointTest.java +++ b/bindings/java/src/test/java/org/openucx/jucx/UcpEndpointTest.java @@ -31,7 +31,7 @@ public void testConnectToListenerByWorkerAddr() { } @Test - public void testGetNB() { + public void testGetNB() throws Exception { // Crerate 2 contexts + 2 workers UcpParams params = new UcpParams().requestRmaFeature(); UcpWorkerParams rdmaWorkerParams = new UcpWorkerParams().requestWakeupRMA(); @@ -102,7 +102,7 @@ public void onSuccess(UcpRequest request) { } @Test - public void testPutNB() { + public void testPutNB() throws Exception { // Crerate 2 contexts + 2 workers UcpParams params = new UcpParams().requestRmaFeature(); UcpWorkerParams rdmaWorkerParams = new UcpWorkerParams().requestWakeupRMA(); @@ -186,7 +186,7 @@ public void onSuccess(UcpRequest request) { } @Test - public void testRecvAfterSend() { + public void testRecvAfterSend() throws Exception { long sendTag = 4L; // Crerate 2 contexts + 2 workers UcpParams params = new UcpParams().requestRmaFeature().requestTagFeature() @@ -211,8 +211,13 @@ public void testRecvAfterSend() { @Override public void run() { while (!isInterrupted()) { - worker1.progress(); - worker2.progress(); + try { + worker1.progress(); + worker2.progress(); + } catch (Exception ex) { + System.err.println(ex.getMessage()); + ex.printStackTrace(); + } } } }; @@ -263,7 +268,7 @@ public void onSuccess(UcpRequest request) { } @Test - public void testBufferOffset() { + public void testBufferOffset() throws Exception { int msgSize = 200; int offset = 100; // Crerate 2 contexts + 2 workers @@ -311,7 +316,7 @@ public void testBufferOffset() { } @Test - public void testFlushEp() { + public void testFlushEp() throws Exception { int numRequests = 10; // Crerate 2 contexts + 2 workers UcpParams params = new UcpParams().requestRmaFeature(); @@ -356,7 +361,7 @@ public void onSuccess(UcpRequest request) { } @Test - public void testRecvSize() { + public void testRecvSize() throws Exception { UcpContext context1 = new UcpContext(new UcpParams().requestTagFeature()); UcpContext context2 = new UcpContext(new UcpParams().requestTagFeature()); @@ -386,7 +391,7 @@ public void testRecvSize() { } @Test - public void testStreamingAPI() { + public void testStreamingAPI() throws Exception { UcpParams params = new UcpParams().requestStreamFeature().requestRmaFeature(); UcpContext context1 = new UcpContext(params); UcpContext context2 = new UcpContext(params); @@ -537,7 +542,7 @@ public void testIovOperations() throws Exception { } @Test - public void testEpErrorHandler() { + public void testEpErrorHandler() throws Exception { // Crerate 2 contexts + 2 workers UcpParams params = new UcpParams().requestTagFeature(); UcpWorkerParams workerParams = new UcpWorkerParams(); diff --git a/bindings/java/src/test/java/org/openucx/jucx/UcpListenerTest.java b/bindings/java/src/test/java/org/openucx/jucx/UcpListenerTest.java index 658a6019700..f17d2b2940d 100644 --- a/bindings/java/src/test/java/org/openucx/jucx/UcpListenerTest.java +++ b/bindings/java/src/test/java/org/openucx/jucx/UcpListenerTest.java @@ -86,7 +86,7 @@ static UcpListener tryBindListener(UcpWorker worker, UcpListenerParams params) { } @Test - public void testConnectionHandler() { + public void testConnectionHandler() throws Exception { UcpContext context1 = new UcpContext(new UcpParams().requestStreamFeature() .requestRmaFeature()); UcpContext context2 = new UcpContext(new UcpParams().requestStreamFeature() @@ -113,7 +113,7 @@ public void testConnectionHandler() { // Create endpoint from another worker from pool. UcpEndpoint serverToClient = serverWorker2.newEndpoint( new UcpEndpointParams().setConnectionRequest(conRequest.get())); - + // Temporary workaround until new connection establishment protocol in UCX. for (int i = 0; i < 10; i++) { serverWorker1.progress(); @@ -147,8 +147,16 @@ public void testConnectionHandler() { assertEquals(UcpMemoryTest.MEM_SIZE, recv.getRecvSize()); + UcpRequest serverClose = serverToClient.closeNonBlockingFlush(); + UcpRequest clientClose = clientToServer.closeNonBlockingFlush(); + + while (!serverClose.isCompleted() || !clientClose.isCompleted()) { + serverWorker2.progress(); + clientWorker.progress(); + } + Collections.addAll(resources, context2, context1, clientWorker, serverWorker1, - serverWorker2, listener, serverToClient, clientToServer); + serverWorker2, listener); closeResources(); } } diff --git a/bindings/java/src/test/java/org/openucx/jucx/UcpRequestTest.java b/bindings/java/src/test/java/org/openucx/jucx/UcpRequestTest.java index 0ac1fc6327c..072852d02f3 100644 --- a/bindings/java/src/test/java/org/openucx/jucx/UcpRequestTest.java +++ b/bindings/java/src/test/java/org/openucx/jucx/UcpRequestTest.java @@ -12,7 +12,7 @@ public class UcpRequestTest { @Test - public void testCancelRequest() { + public void testCancelRequest() throws Exception { UcpContext context = new UcpContext(new UcpParams().requestTagFeature()); UcpWorker worker = context.newWorker(new UcpWorkerParams()); UcpRequest recv = worker.recvTaggedNonBlocking(ByteBuffer.allocateDirect(100), null); diff --git a/bindings/java/src/test/java/org/openucx/jucx/UcpWorkerTest.java b/bindings/java/src/test/java/org/openucx/jucx/UcpWorkerTest.java index d896898a038..da6596dd07f 100644 --- a/bindings/java/src/test/java/org/openucx/jucx/UcpWorkerTest.java +++ b/bindings/java/src/test/java/org/openucx/jucx/UcpWorkerTest.java @@ -19,7 +19,7 @@ public class UcpWorkerTest extends UcxTest { private static int numWorkers = Runtime.getRuntime().availableProcessors(); @Test - public void testSingleWorker() { + public void testSingleWorker() throws Exception { UcpContext context = new UcpContext(new UcpParams().requestTagFeature()); assertEquals(2, UcsConstants.ThreadMode.UCS_THREAD_MODE_MULTI); assertNotEquals(context.getNativeId(), null); @@ -99,8 +99,12 @@ public void testWorkerSleepWakeup() throws InterruptedException { @Override public void run() { while (!isInterrupted()) { - if (worker.progress() == 0) { - worker.waitForEvents(); + try { + if (worker.progress() == 0) { + worker.waitForEvents(); + } + } catch (Exception e) { + e.printStackTrace(); } } success.set(true); @@ -120,7 +124,7 @@ public void run() { } @Test - public void testFlushWorker() { + public void testFlushWorker() throws Exception { int numRequests = 10; // Crerate 2 contexts + 2 workers UcpParams params = new UcpParams().requestRmaFeature(); @@ -166,7 +170,7 @@ public void onSuccess(UcpRequest request) { } @Test - public void testTagProbe() { + public void testTagProbe() throws Exception { UcpParams params = new UcpParams().requestTagFeature(); UcpContext context1 = new UcpContext(params); UcpContext context2 = new UcpContext(params); From 5ca2a322ae322b1d9afff4d0b4cddf52167a47f0 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Wed, 18 Nov 2020 11:20:50 +0200 Subject: [PATCH 2/2] UCP: SOCKADDR_CM_ENABLE=y by default --- src/ucp/core/ucp_context.c | 2 +- src/ucp/core/ucp_ep.c | 2 +- src/ucp/wireup/wireup.c | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 71676e3ca84..5b5c5e4b7e5 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -284,7 +284,7 @@ static ucs_config_field_t ucp_config_table[] = { "of all entities which connect to each other are the same.", ucs_offsetof(ucp_config_t, ctx.unified_mode), UCS_CONFIG_TYPE_BOOL}, - {"SOCKADDR_CM_ENABLE", "n" /* TODO: set try by default */, + {"SOCKADDR_CM_ENABLE", "y", "Enable alternative wireup protocol for sockaddr connected endpoints.\n" "Enabling this mode changes underlying UCT mechanism for connection\n" "establishment and enables synchronized close protocol which does not\n" diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 91b8d277192..4caf0ff721a 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -379,7 +379,7 @@ ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags, /* all operations will use the first lane, which is a stub endpoint before * reconfiguration */ key.am_lane = 0; - if (ucp_worker_sockaddr_is_cm_proto(ep->worker)) { + if (ucp_ep_init_flags_has_cm(ep_init_flags)) { key.cm_lane = 0; } else { key.wireup_msg_lane = 0; diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 44a8865fb5c..7f34412bbf8 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -1463,9 +1463,9 @@ unsigned ucp_ep_init_flags(const ucp_worker_h worker, { unsigned flags = ucp_cm_ep_init_flags(worker, params); - if ((ucp_worker_sockaddr_is_cm_proto(worker) && - worker->context->config.ext.cm_use_all_devices) || - (params->field_mask & UCP_EP_PARAM_FIELD_SOCK_ADDR)) { + if (ucp_ep_init_flags_has_cm(flags) && + worker->context->config.ext.cm_use_all_devices) { + /* request AM lane for wireup MSG protocol which enables all devices */ flags |= UCP_EP_INIT_CREATE_AM_LANE; }