Skip to content

Commit

Permalink
Add stdout/stderr selection.
Browse files Browse the repository at this point in the history
  • Loading branch information
Attila Zsolt Piros committed Feb 18, 2023
1 parent 7c6a142 commit 948efac
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 81 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ In this case all the rules are used from both the internal and external action f
In distributed environment when external action file is used you should take care on each node the action file is really can be accessed using the path.
Otherwise the error is logged but the application continues: "TraceAgent does not find the external action file: <file>".

#### Select the target output stream: stderr/stdout

By default TraceAgent traces to the `stdout` but `stderr` can be selected by passing `targetStream:stderr` to the agent:

```
java -javaagent:target/trace-agent-1.0-SNAPSHOT.jar="targetStream:stderr" -jar ../testartifact/target/testartifact-1.0-SNAPSHOT.jar
```


#### Enable agent logging

To troubleshoot the process of class transformation and instrumentation verbose logging on
Expand Down Expand Up @@ -587,6 +596,14 @@ When the submit process itself need to be traced then in the `SPARK_SUBMIT_OPTS`
export SPARK_SUBMIT_OPTS="-javaagent:trace-agent-0.0.8.jar"
```

## YARN AM

When the YARN resource allocation is need to be traced at client mode the `spark.yarn.am.extraJavaOptions` must be used:

```
--conf spark.yarn.am.extraJavaOptions="-javaagent:trace-agent-0.0.8.jar" --jars trace-agent-0.0.8.jar
```

## Spark driver client mode

In case of client mode when the driver is at node where you call spark-submit at you can simply start Spark with the config
Expand Down
16 changes: 16 additions & 0 deletions trace-agent/src/main/java/net/test/GlobalArguments.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package net.test;

import java.io.PrintStream;

public class GlobalArguments {

private final PrintStream targetStream;

public GlobalArguments(PrintStream targetStream) {
this.targetStream = targetStream;
}

public PrintStream getTargetStream() {
return targetStream;
}
}
31 changes: 17 additions & 14 deletions trace-agent/src/main/java/net/test/TraceAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ class TraceAction {

private final String actionArgs;

public TraceAction(String actionId, String classMatcherExp, String methodMatcherExp, String actionArgs) {
private final GlobalArguments globalArguments;

public TraceAction(GlobalArguments globalArguments, String actionId, String classMatcherExp, String methodMatcherExp, String actionArgs) {
this.actionId = actionId;
this.classMatcherExp = classMatcherExp;
this.methodMatcherExp = methodMatcherExp;
this.actionArgs = actionArgs;
this.globalArguments = globalArguments;
}

public TraceAction(String actionId, String classMatcherExp, String methodMatcherExp) {
this(actionId, classMatcherExp, methodMatcherExp, null);
public TraceAction(GlobalArguments globalArguments, String actionId, String classMatcherExp, String methodMatcherExp) {
this(globalArguments, actionId, classMatcherExp, methodMatcherExp, null);
}

@Override
Expand All @@ -36,27 +39,27 @@ public String toString() {
public Object getActionInterceptor(DefaultArguments defaultArguments) {
final Object interceptor;
if (actionId.equals(TimingInterceptorNano.NAME)) {
interceptor = new TimingInterceptorNano(actionArgs, defaultArguments);
interceptor = new TimingInterceptorNano(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(TimingInterceptorMs.NAME)) {
interceptor = new TimingInterceptorMs(actionArgs, defaultArguments);
interceptor = new TimingInterceptorMs(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(StackTraceInterceptor.NAME)) {
interceptor = new StackTraceInterceptor(actionArgs, defaultArguments);
interceptor = new StackTraceInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(TraceArgsInterceptor.NAME)) {
interceptor = new TraceArgsInterceptor(actionArgs, defaultArguments);
interceptor = new TraceArgsInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(TraceRetValueInterceptor.NAME)) {
interceptor = new TraceRetValueInterceptor(actionArgs, defaultArguments);
interceptor = new TraceRetValueInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(CounterInterceptor.NAME)) {
interceptor = new CounterInterceptor(actionArgs, defaultArguments);
interceptor = new CounterInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(AvgTimingInterceptorMs.NAME)) {
interceptor = new AvgTimingInterceptorMs(actionArgs, defaultArguments);
interceptor = new AvgTimingInterceptorMs(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(TraceLoginConfigInterceptor.NAME)) {
interceptor = new TraceLoginConfigInterceptor(actionArgs, defaultArguments);
interceptor = new TraceLoginConfigInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(TraceArgsWithMethodCallInterceptor.NAME)) {
interceptor = new TraceArgsWithMethodCallInterceptor(actionArgs, defaultArguments);
interceptor = new TraceArgsWithMethodCallInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(DiagnosticCommandInterceptor.NAME)) {
interceptor = new DiagnosticCommandInterceptor(actionArgs, defaultArguments);
interceptor = new DiagnosticCommandInterceptor(globalArguments, actionArgs, defaultArguments);
} else if (actionId.equals(HeapDumpCommandInterceptor.NAME)) {
interceptor = new HeapDumpCommandInterceptor(actionArgs, defaultArguments);
interceptor = new HeapDumpCommandInterceptor(globalArguments, actionArgs, defaultArguments);
} else {
System.err.println("TraceAgent detected an invalid action: " + actionId);
interceptor = null;
Expand Down
5 changes: 3 additions & 2 deletions trace-agent/src/main/java/net/test/TraceAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ public class TraceAgent {

private TraceAction readAction(String line) {
String[] actionWithArgs = line.split("\\s+");
final GlobalArguments globalArgs = new GlobalArguments(traceAgentArgs.getTargetStream());
final TraceAction traceAction;
if (actionWithArgs.length == 4) {
traceAction = new TraceAction(actionWithArgs[0], actionWithArgs[1], actionWithArgs[2], actionWithArgs[3]);
traceAction = new TraceAction(globalArgs, actionWithArgs[0], actionWithArgs[1], actionWithArgs[2], actionWithArgs[3]);
} else if (actionWithArgs.length == 3) {
traceAction = new TraceAction(actionWithArgs[0], actionWithArgs[1], actionWithArgs[2]);
traceAction = new TraceAction(globalArgs, actionWithArgs[0], actionWithArgs[1], actionWithArgs[2]);
} else {
traceAction = null;
}
Expand Down
15 changes: 14 additions & 1 deletion trace-agent/src/main/java/net/test/TraceAgentArgs.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.test;

import java.io.PrintStream;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;
Expand All @@ -9,6 +10,7 @@ public class TraceAgentArgs implements DefaultArguments {
private static final String EXTERNAL_ACTION_FILE_PATH = "actionsFile";
private static final String ENABLE_AGENT_LOG = "enableAgentLog";
private static final String DATE_TIME_FORMAT = "dateTimeFormat";
private static final String TARGET_STREAM = "targetStream";

private final String externalActionFilePath;

Expand All @@ -18,10 +20,17 @@ public class TraceAgentArgs implements DefaultArguments {

private final Boolean isDateLoggedFlag;

private final PrintStream targetStream;

public TraceAgentArgs(String arguments) {
Map<String, String> parsedArgs =
ArgUtils.parseOptionalArgs(Arrays.asList(EXTERNAL_ACTION_FILE_PATH, ENABLE_AGENT_LOG, DATE_TIME_FORMAT, CommonActionArgs.IS_DATE_LOGGED), arguments);
ArgUtils.parseOptionalArgs(Arrays.asList(EXTERNAL_ACTION_FILE_PATH, ENABLE_AGENT_LOG, DATE_TIME_FORMAT, CommonActionArgs.IS_DATE_LOGGED, TARGET_STREAM), arguments);
this.externalActionFilePath = parsedArgs.get(EXTERNAL_ACTION_FILE_PATH);
if (parsedArgs.getOrDefault(TARGET_STREAM, "stdout").equals("stderr")) {
this.targetStream = System.err;
} else {
this.targetStream = System.out;
}
this.enableAgentLog = Boolean.valueOf(parsedArgs.get(ENABLE_AGENT_LOG));

final String dateTimeFormatStr = parsedArgs.get(DATE_TIME_FORMAT);
Expand Down Expand Up @@ -50,4 +59,8 @@ public DateTimeFormatter getDateTimeFormatter() {
public boolean isDateLogged() {
return this.isDateLoggedFlag;
}

public PrintStream getTargetStream() {
return targetStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import net.test.ArgumentsCollection;
import net.test.CommonActionArgs;
import net.test.DefaultArguments;
import net.test.GlobalArguments;

import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
Expand All @@ -27,15 +28,18 @@ public class AvgTimingInterceptorMs {

private final int window_length;

private final GlobalArguments globalArguments;

private volatile long window_min = 0;
private volatile long window_max = 0;
private volatile long window_sum = 0;
private volatile int window_index = 0;

public AvgTimingInterceptorMs(String actionArgs, DefaultArguments defaults) {
public AvgTimingInterceptorMs(GlobalArguments globalArguments, String actionArgs, DefaultArguments defaults) {
ArgumentsCollection parsed = ArgUtils.parseOptionalArgs(KNOWN_ARGS, actionArgs);
this.commonActionArgs = new CommonActionArgs(parsed, defaults);
this.window_length = parsed.parseInt(WINDOW_LENGTH, 100);
this.globalArguments = globalArguments;
}

@RuntimeType
Expand All @@ -57,20 +61,22 @@ public Object intercept(@Origin Method method, @SuperCall Callable<?> callable)
window_max = elapsedTime;
}
if (window_index == window_length) {
System.out.println(
commonActionArgs.addPrefix(
"TraceAgent ("
+ NAME
+ "): `"
+ method
+ "` window_length: "
+ window_length
+ " min: "
+ window_min
+ " avg: "
+ window_sum / window_length
+ " max: "
+ window_max));
globalArguments
.getTargetStream()
.println(
commonActionArgs.addPrefix(
"TraceAgent ("
+ NAME
+ "): `"
+ method
+ "` window_length: "
+ window_length
+ " min: "
+ window_min
+ " avg: "
+ window_sum / window_length
+ " max: "
+ window_max));
window_index = 0;
window_sum = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import net.test.ArgumentsCollection;
import net.test.CommonActionArgs;
import net.test.DefaultArguments;
import net.test.GlobalArguments;

import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
Expand Down Expand Up @@ -31,17 +32,20 @@ public class CounterInterceptor {

private long counter = 0;

public CounterInterceptor(String actionArgs, DefaultArguments defaults) {
private final GlobalArguments globalArguments;

public CounterInterceptor(GlobalArguments globalArguments, String actionArgs, DefaultArguments defaults) {
ArgumentsCollection parsed = ArgUtils.parseOptionalArgs(KNOWN_ARGS, actionArgs);
this.commonActionArgs = new CommonActionArgs(parsed, defaults);
this.countFrequency = parsed.parseInt(COUNT_FREQUENCY, 100);
this.globalArguments = globalArguments;
}

@RuntimeType
public Object intercept(@Origin Method method, @AllArguments Object[] allArguments, @SuperCall Callable<?> callable) throws Exception {
counter++;
if (counter % countFrequency == 0) {
System.out.println(commonActionArgs.addPrefix("TraceAgent (counter): `" + method + "` called " + counter));
globalArguments.getTargetStream().println(commonActionArgs.addPrefix("TraceAgent (counter): `" + method + "` called " + counter));
}
return callable.call();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import net.test.ArgumentsCollection;
import net.test.CommonActionArgs;
import net.test.DefaultArguments;
import net.test.GlobalArguments;

import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
Expand Down Expand Up @@ -57,7 +58,9 @@ private static ObjectName createDiagObj() {

private final boolean withGC;

public DiagnosticCommandInterceptor(String actionArgs, DefaultArguments defaults) {
private final GlobalArguments globalArguments;

public DiagnosticCommandInterceptor(GlobalArguments globalArguments, String actionArgs, DefaultArguments defaults) {
ArgumentsCollection parsed = ArgUtils.parseOptionalArgs(KNOWN_ARGS, actionArgs);
this.commonActionArgs = new CommonActionArgs(parsed, defaults);
this.command = parsed.get(COMMAND);
Expand All @@ -78,10 +81,11 @@ public DiagnosticCommandInterceptor(String actionArgs, DefaultArguments defaults
isAfter = true;
break;
default:
System.out.println("TraceAgent: (diagnostic_command / " + command + ") invalid value for `where`: " + where + ". Action is switched off!");
globalArguments.getTargetStream().println("TraceAgent: (diagnostic_command / " + command + ") invalid value for `where`: " + where + ". Action is switched off!");
isBefore = false;
isAfter = false;
}
this.globalArguments = globalArguments;
}

private String invokeNoStringArgumentsCommand(final String operationName) {
Expand Down Expand Up @@ -118,7 +122,7 @@ private void forceGC() {
obj = null;
invokeNoStringArgumentsCommand("GC.run");
while (ref.get() != null) {
System.out.println("TraceAgent (diagnostic_command) call System.gc()");
globalArguments.getTargetStream().println("TraceAgent (diagnostic_command) call System.gc()");
System.gc();
}
}
Expand All @@ -132,14 +136,16 @@ public Object intercept(@Origin Method method, @SuperCall Callable<?> callable)
if (withGC) {
forceGC();
}
System.out.println(
commonActionArgs.addPrefix(
"TraceAgent (diagnostic_command / "
+ command
+ "): at the beginning of `"
+ method
+ "`:\n"
+ getFirstLines(invokeNoStringArgumentsCommand(command), limitForOutputLines)));
globalArguments
.getTargetStream()
.println(
commonActionArgs.addPrefix(
"TraceAgent (diagnostic_command / "
+ command
+ "): at the beginning of `"
+ method
+ "`:\n"
+ getFirstLines(invokeNoStringArgumentsCommand(command), limitForOutputLines)));
}
try {
return callable.call();
Expand All @@ -148,14 +154,16 @@ public Object intercept(@Origin Method method, @SuperCall Callable<?> callable)
if (withGC) {
forceGC();
}
System.out.println(
commonActionArgs.addPrefix(
"TraceAgent (diagnostic_command / "
+ command
+ "): at the end of `"
+ method
+ "`:\n"
+ getFirstLines(invokeNoStringArgumentsCommand(command), limitForOutputLines)));
globalArguments
.getTargetStream()
.println(
commonActionArgs.addPrefix(
"TraceAgent (diagnostic_command / "
+ command
+ "): at the end of `"
+ method
+ "`:\n"
+ getFirstLines(invokeNoStringArgumentsCommand(command), limitForOutputLines)));
}
}
}
Expand Down
Loading

0 comments on commit 948efac

Please sign in to comment.