Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ucs_cbq
Browse files Browse the repository at this point in the history
  • Loading branch information
evgeny-leksikov committed Dec 16, 2020
2 parents 71011b0 + 50c10f2 commit 087e922
Show file tree
Hide file tree
Showing 208 changed files with 4,914 additions and 2,080 deletions.
101 changes: 101 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# C
BasedOnStyle: LLVM
AlignEscapedNewlines: DontAlign
AlignConsecutiveAssignments: true
AlignConsecutiveDeclarations: false
AlignConsecutiveMacros: true
AlignAfterOpenBracket: true
AlignOperands: true
PointerAlignment: Right
DerivePointerAlignment: false
AlignTrailingComments: false
AllowAllArgumentsOnNextLine: false
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: false
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterReturnType: None
PenaltyReturnTypeOnItsOwnLine: 100
PenaltyBreakAssignment: 100
PenaltyExcessCharacter: 10
ColumnLimit: 85
AlwaysBreakBeforeMultilineStrings: false
BinPackArguments: true
BinPackParameters: true
BreakBeforeBraces: Custom
BraceWrapping:
AfterClass: true
AfterControlStatement: false
AfterEnum: false
AfterFunction: true
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
IndentBraces: false
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakBeforeBinaryOperators: false
BreakBeforeTernaryOperators: false
BreakStringLiterals: true
ContinuationIndentWidth: 4
IncludeBlocks: Regroup
IndentCaseLabels: false
IndentWidth: 4
KeepEmptyLinesAtTheStartOfBlocks: false
IndentPPDirectives: None
MaxEmptyLinesToKeep: 2
SpacesInCStyleCastParentheses: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
SpaceInEmptyParentheses: false
SpaceBeforeParens: ControlStatementsExceptForEachMacros
SpaceBeforeAssignmentOperators: true
SpaceAfterCStyleCast: false
ForEachMacros: ["ucs_for_each_submask"]

# CPP
Standard: Cpp11
AccessModifierOffset: -4
AlwaysBreakTemplateDeclarations: false
BreakBeforeInheritanceComma: false
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: true
ConstructorInitializerIndentWidth: 4
Cpp11BracedListStyle: true
FixNamespaceComments: true
NamespaceIndentation: None
UseTab: Never
ReflowComments: true
SortIncludes: true
IncludeCategories:
- Regex: '^"'
Priority: 1
- Regex: '^<'
Priority: 2
SortUsingDeclarations: true
TabWidth: 4
SpacesInAngles: false
SpacesBeforeTrailingComments: 1
SpaceAfterTemplateKeyword: false
SpacesInContainerLiterals: false

# Java
Language: Java
DisableFormat: true

# Issues:
# 1.
# Pointer alignment + declaration alignment:
# long_type_name var;
# void * ptr;
# Instead of:
#void *ptr;

...
3 changes: 2 additions & 1 deletion NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

## Current
### Features: TBD
#### UCX Core TBD
#### UCX Core
- Added ucp_tag_msg_recv_nbx routine.
#### UCX Java (API Preview) TBD
### Bugfixes: TBD

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ protected static void createContextAndWorker() {
}

protected static double getBandwithGbits(long nanoTimeDelta, long size) {
double sizeInGigabits = (double)size * 8.0 / 1e9;
double secondsElapsed = nanoTimeDelta / 1e9;
return sizeInGigabits / secondsElapsed;
return (double)size * 8.0 / nanoTimeDelta;
}

protected static void closeResources() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ public static void main(String[] args) throws Exception {
.setConnectionRequest(connRequest.get())
.setPeerErrorHandlingMode());

// Temporary workaround until new connection establishment protocol in UCX.
for (int i = 0; i < 10; i++) {
worker.progress();
try {
Thread.sleep(10);
} catch (Exception ignored) { }
}

ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);
UcpRequest recvRequest = worker.recvTaggedNonBlocking(recvBuffer, null);

Expand All @@ -76,9 +68,9 @@ public static void main(String[] args) throws Exception {
for (int i = 0; i < numIterations; i++) {
final int iterNum = i;
UcpRequest getRequest = endpoint.getNonBlocking(remoteAddress, remoteKey,
recvMemory.getAddress(), totalSize,
recvMemory.getAddress(), remoteSize,
new UcxCallback() {
long startTime = System.nanoTime();
final long startTime = System.nanoTime();

@Override
public void onSuccess(UcpRequest request) {
Expand All @@ -95,12 +87,6 @@ public void onSuccess(UcpRequest request) {
data.put(0, (byte)1);
}

ByteBuffer sendBuffer = ByteBuffer.allocateDirect(100);
sendBuffer.asCharBuffer().put("DONE");

UcpRequest sent = endpoint.sendTaggedNonBlocking(sendBuffer, null);
worker.progressRequest(sent);

UcpRequest closeRequest = endpoint.closeNonBlockingFlush();
worker.progressRequest(closeRequest);
// Close request won't be return to pull automatically, since there's no callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

package org.openucx.jucx.examples;

import org.openucx.jucx.UcxCallback;
import org.openucx.jucx.ucp.UcpRequest;
import org.openucx.jucx.UcxException;
import org.openucx.jucx.ucp.*;
import org.openucx.jucx.UcxUtils;
import org.openucx.jucx.ucp.UcpEndpoint;
import org.openucx.jucx.ucp.UcpEndpointParams;
import org.openucx.jucx.ucp.UcpMemory;
import org.openucx.jucx.ucs.UcsConstants;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

Expand All @@ -28,6 +27,13 @@ public static void main(String[] args) throws Exception {
String serverHost = argsMap.get("s");
UcpEndpoint endpoint = worker.newEndpoint(new UcpEndpointParams()
.setPeerErrorHandlingMode()
.setErrorHandler((ep, status, errorMsg) -> {
if (status == UcsConstants.STATUS.UCS_ERR_CONNECTION_RESET) {
throw new ConnectException(errorMsg);
} else {
throw new UcxException(errorMsg);
}
})
.setSocketAddress(new InetSocketAddress(serverHost, serverPort)));

UcpMemory memory = context.memoryMap(allocationParams);
Expand All @@ -49,22 +55,25 @@ public static void main(String[] args) throws Exception {

// Send memory metadata and wait until receiver will finish benchmark.
endpoint.sendTaggedNonBlocking(sendData, null);
ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);
UcpRequest recvRequest = worker.recvTaggedNonBlocking(recvBuffer,
new UcxCallback() {
@Override
public void onSuccess(UcpRequest request) {
System.out.println("Received a message:");
System.out.println(recvBuffer.asCharBuffer().toString().trim());
}
});

worker.progressRequest(recvRequest);

UcpRequest closeRequest = endpoint.closeNonBlockingFlush();
worker.progressRequest(closeRequest);
resources.push(closeRequest);
try {
while (true) {
if (worker.progress() == 0) {
worker.waitForEvents();
}
}
} catch (ConnectException ignored) {
} catch (Exception ex) {
System.err.println(ex.getMessage());
}

closeResources();
try {
UcpRequest closeRequest = endpoint.closeNonBlockingForce();
resources.push(closeRequest);
worker.progressRequest(closeRequest);
} catch (Exception ignored) {
} finally {
closeResources();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ public interface UcpEndpointErrorHandler {
* all subsequent operations on this ep will fail with
* the error code passed in {@code status}.
*/
void onError(UcpEndpoint ep, int status, String errorMsg);
void onError(UcpEndpoint ep, int status, String errorMsg) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public void close() {
* This routine explicitly progresses all communication operations on a worker.
* @return Non-zero if any communication was progressed, zero otherwise.
*/
public int progress() {
public int progress() throws Exception {
return progressWorkerNative(getNativeId());
}

/**
* Blocking progress for request until it's not completed.
*/
public void progressRequest(UcpRequest request) {
public void progressRequest(UcpRequest request) throws Exception {
while (!request.isCompleted()) {
progress();
}
Expand Down Expand Up @@ -251,7 +251,7 @@ public ByteBuffer getAddress() {

private static native void releaseAddressNative(long workerId, ByteBuffer addressId);

private static native int progressWorkerNative(long workerId);
private static native int progressWorkerNative(long workerId) throws Exception;

private static native UcpRequest flushNonBlockingNative(long workerId, UcxCallback callback);

Expand Down
64 changes: 64 additions & 0 deletions bindings/java/src/main/java/org/openucx/jucx/ucs/UcsConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,70 @@ public static class ThreadMode {
public static int UCS_THREAD_MODE_MULTI;
}

/**
* Status codes
*/
public static class STATUS {
static {
load();
}

/* Operation completed successfully */
public static int UCS_OK;

/* Operation is queued and still in progress */
public static int UCS_INPROGRESS;

/* Failure codes */
public static int UCS_ERR_NO_MESSAGE;
public static int UCS_ERR_NO_RESOURCE;
public static int UCS_ERR_IO_ERROR;
public static int UCS_ERR_NO_MEMORY;
public static int UCS_ERR_INVALID_PARAM;
public static int UCS_ERR_UNREACHABLE;
public static int UCS_ERR_INVALID_ADDR;
public static int UCS_ERR_NOT_IMPLEMENTED;
public static int UCS_ERR_MESSAGE_TRUNCATED;
public static int UCS_ERR_NO_PROGRESS;
public static int UCS_ERR_BUFFER_TOO_SMALL;
public static int UCS_ERR_NO_ELEM;
public static int UCS_ERR_SOME_CONNECTS_FAILED;
public static int UCS_ERR_NO_DEVICE;
public static int UCS_ERR_BUSY;
public static int UCS_ERR_CANCELED;
public static int UCS_ERR_SHMEM_SEGMENT;
public static int UCS_ERR_ALREADY_EXISTS;
public static int UCS_ERR_OUT_OF_RANGE;
public static int UCS_ERR_TIMED_OUT;
public static int UCS_ERR_EXCEEDS_LIMIT;
public static int UCS_ERR_UNSUPPORTED;
public static int UCS_ERR_REJECTED;
public static int UCS_ERR_NOT_CONNECTED;
public static int UCS_ERR_CONNECTION_RESET;

public static int UCS_ERR_FIRST_LINK_FAILURE;
public static int UCS_ERR_LAST_LINK_FAILURE;
public static int UCS_ERR_FIRST_ENDPOINT_FAILURE;
public static int UCS_ERR_ENDPOINT_TIMEOUT;
public static int UCS_ERR_LAST_ENDPOINT_FAILURE;

public static int UCS_ERR_LAST;
}

public static class MEMORY_TYPE {
static {
load();
}

public static int UCS_MEMORY_TYPE_HOST; // Default system memory
public static int UCS_MEMORY_TYPE_CUDA; // NVIDIA CUDA memory
public static int UCS_MEMORY_TYPE_CUDA_MANAGED; // NVIDIA CUDA managed (or unified) memory
public static int UCS_MEMORY_TYPE_ROCM; // AMD ROCM memory
public static int UCS_MEMORY_TYPE_ROCM_MANAGED; // AMD ROCM managed system memory
public static int UCS_MEMORY_TYPE_LAST;
public static int UCS_MEMORY_TYPE_UNKNOWN;
}

private static void load() {
NativeLibs.load();
loadConstants();
Expand Down
5 changes: 3 additions & 2 deletions bindings/java/src/main/native/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ MOSTLYCLEANFILES = $(JUCX_GENERATED_H_FILES) $(STAMP_FILE)
#
$(STAMP_FILE): \
$(javadir)/src/main/java/org/openucx/jucx/ucs/*.java \
$(javadir)/src/main/java/org/openucx/jucx/ucp/*.java
$(javadir)/src/main/java/org/openucx/jucx/ucp/*.java \
$(javadir)/src/main/java/org/openucx/jucx/examples/*.java
$(MVNCMD) compile
touch $(STAMP_FILE)

Expand Down Expand Up @@ -73,7 +74,7 @@ libjucx_la_LIBADD = $(topdir)/src/ucs/libucs.la \
libjucx_la_DEPENDENCIES = Makefile.am Makefile.in Makefile

# Compile Java source code and pack to jar
$(jarfile):
$(jarfile): libjucx.la
$(MVNCMD) package -DskipTests

package : $(jarfile)
Expand Down
Loading

0 comments on commit 087e922

Please sign in to comment.