Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugins): spring custom plugins #10389

Merged
merged 31 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a2c4abb
fix(mae): fix mae standalone platform consumer
david-leifker Apr 22, 2024
b70d28b
spring custom plugins
david-leifker Apr 23, 2024
45f563b
Merge remote-tracking branch 'datahubproject/master' into spring-cust…
david-leifker Apr 25, 2024
fce41e8
spring plugins
david-leifker Apr 25, 2024
f6d5f82
Merge remote-tracking branch 'datahubproject/master' into spring-cust…
david-leifker Apr 25, 2024
226794c
allow null context, skip spring if no package scan location
david-leifker Apr 25, 2024
b7417c2
create custom plugin dependency library
david-leifker Apr 29, 2024
519b8d2
Merge branch 'master' into spring-custom-plugins
david-leifker May 3, 2024
24d8d57
tweak shadow jar, jackson and antlr
david-leifker May 6, 2024
74cd310
spring custom plugins
david-leifker Apr 23, 2024
5e81000
spring plugins
david-leifker Apr 25, 2024
69a195b
allow null context, skip spring if no package scan location
david-leifker Apr 25, 2024
8df7d3f
create custom plugin dependency library
david-leifker Apr 29, 2024
9da5da3
tweak shadow jar, jackson and antlr
david-leifker May 6, 2024
38f3fe6
fix(openapi): ignore casing of aspect names
david-leifker May 7, 2024
dda060f
Merge branch 'spring-custom-plugins' of github.com:datahub-project/da…
david-leifker May 7, 2024
2b6a64e
fix doc build
david-leifker May 7, 2024
33fc3ce
docs: note breaking change for constructor of custom plugins
david-leifker May 7, 2024
26b4597
Merge branch 'master' into spring-custom-plugins
david-leifker May 7, 2024
4b2a195
remove value from aspect request schemas (#10450)
kevin1chun May 7, 2024
c1fce8f
Merge branch 'master' into spring-custom-plugins
kevin1chun May 8, 2024
7780bc1
Adding nullable to open api models (#10463)
kevin1chun May 9, 2024
61f8345
Merge remote-tracking branch 'origin/master' into spring-custom-plugins
david-leifker May 9, 2024
6364c10
Adjust artifact name
david-leifker May 9, 2024
caaf722
feat(custom-plugin): improve class scanning
david-leifker May 9, 2024
b22b0cf
Merge remote-tracking branch 'origin/master' into spring-custom-plugins
david-leifker May 9, 2024
00e5a7f
jdk8 support
david-leifker May 9, 2024
6961836
Merge branch 'master' into spring-custom-plugins
david-leifker May 9, 2024
d1e9dfb
bugfix in openapi params (#10474)
kevin1chun May 9, 2024
b66d2c9
Merge remote-tracking branch 'origin/master' into spring-custom-plugins
david-leifker May 9, 2024
c6a4bf0
lint, temp disable jar publish
david-leifker May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ project.ext.externalDependency = [
'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0',
'annotationApi': 'javax.annotation:javax.annotation-api:1.3.2',
'jakartaAnnotationApi': 'jakarta.annotation:jakarta.annotation-api:3.0.0',
'classGraph': 'io.github.classgraph:classgraph:4.8.168',
'classGraph': 'io.github.classgraph:classgraph:4.8.172',
]

allprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.linkedin.datahub.graphql.generated.SearchSuggestion;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.metadata.entity.validation.ValidationUtils;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.utils.SearchUtils;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -89,7 +89,7 @@ public static List<MatchedField> getMatchedFieldEntry(
if (SearchUtils.isUrn(field.getValue())) {
try {
Urn urn = Urn.createFromString(field.getValue());
ValidationUtils.validateUrn(
ValidationApiUtils.validateUrn(
context.getOperationContext().getEntityRegistry(), urn);
matchedField.setEntity(UrnToEntityMapper.map(context, urn));
} catch (IllegalArgumentException | URISyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.metadata.entity.validation.ValidationUtils;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
Expand Down Expand Up @@ -42,7 +42,7 @@ public void testMatchedFieldValidation() throws URISyntaxException {
"urn:li:dataset:%28urn:li:dataPlatform:s3%2Ctest-datalake-concepts/prog_maintenance%2CPROD%29");
assertThrows(
IllegalArgumentException.class,
() -> ValidationUtils.validateUrn(entityRegistry, invalidUrn));
() -> ValidationApiUtils.validateUrn(entityRegistry, invalidUrn));

QueryContext mockContext = mock(QueryContext.class);
when(mockContext.getOperationContext())
Expand Down
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes

- #10419 - `aws_region` is now a required configuration in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.
- #10389 - Custom validators, mutators, side-effects dropped a previously required constructor
- #10472 - `RVW` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.

### Potential Downtime
Expand Down
2 changes: 1 addition & 1 deletion entity-registry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
implementation externalDependency.jacksonDataFormatYaml
implementation externalDependency.reflections

implementation externalDependency.jsonPatch
api externalDependency.jsonPatch
implementation externalDependency.jsonPathImpl

constraints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

@Getter
@Setter
@Accessors(chain = true)
public class StructuredPropertiesSoftDelete extends MutationHook {
public StructuredPropertiesSoftDelete(AspectPluginConfig aspectPluginConfig) {
super(aspectPluginConfig);
}
@Nonnull private AspectPluginConfig config;

@Override
protected Stream<Pair<ReadItem, Boolean>> readMutation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -24,21 +26,13 @@
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;

@Slf4j
public class PluginFactory {

private static final String[] VALIDATOR_PACKAGES = {
"com.linkedin.metadata.aspect.plugins.validation", "com.linkedin.metadata.aspect.validation"
};
private static final String[] HOOK_PACKAGES = {
"com.linkedin.metadata.aspect.plugins.hooks", "com.linkedin.metadata.aspect.hooks"
};

public static PluginFactory withCustomClasspath(
@Nullable PluginConfiguration pluginConfiguration, @Nonnull List<ClassLoader> classLoaders) {
return new PluginFactory(pluginConfiguration, classLoaders);
return new PluginFactory(pluginConfiguration, classLoaders).loadPlugins();
}

public static PluginFactory withConfig(@Nullable PluginConfiguration pluginConfiguration) {
Expand All @@ -49,44 +43,135 @@ public static PluginFactory empty() {
return PluginFactory.withConfig(PluginConfiguration.EMPTY);
}

public static PluginFactory merge(PluginFactory a, PluginFactory b) {
return PluginFactory.withCustomClasspath(
PluginConfiguration.merge(a.getPluginConfiguration(), b.getPluginConfiguration()),
public static PluginFactory merge(
PluginFactory a,
PluginFactory b,
@Nullable
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider) {
PluginConfiguration mergedPluginConfig =
PluginConfiguration.merge(a.pluginConfiguration, b.pluginConfiguration);
List<ClassLoader> mergedClassLoaders =
Stream.concat(a.getClassLoaders().stream(), b.getClassLoaders().stream())
.collect(Collectors.toList()));
.collect(Collectors.toList());

if (pluginFactoryProvider != null) {
return pluginFactoryProvider.apply(mergedPluginConfig, mergedClassLoaders);
} else {
return PluginFactory.withCustomClasspath(mergedPluginConfig, mergedClassLoaders);
}
}

@Getter private final PluginConfiguration pluginConfiguration;
@Nonnull @Getter private final List<ClassLoader> classLoaders;
@Getter private final List<AspectPayloadValidator> aspectPayloadValidators;
@Getter private final List<MutationHook> mutationHooks;
@Getter private final List<MCLSideEffect> mclSideEffects;
@Getter private final List<MCPSideEffect> mcpSideEffects;
@Getter private List<AspectPayloadValidator> aspectPayloadValidators;
@Getter private List<MutationHook> mutationHooks;
@Getter private List<MCLSideEffect> mclSideEffects;
@Getter private List<MCPSideEffect> mcpSideEffects;

private final ClassGraph classGraph;
private static final Map<Long, List<PluginSpec>> pluginCache = new ConcurrentHashMap<>();

public PluginFactory(
@Nullable PluginConfiguration pluginConfiguration, @Nonnull List<ClassLoader> classLoaders) {
this.classGraph =
new ClassGraph()
.acceptPackages(ArrayUtils.addAll(HOOK_PACKAGES, VALIDATOR_PACKAGES))
.enableRemoteJarScanning()
.enableExternalClasses()
.enableClassInfo()
.enableMethodInfo();

this.classLoaders = classLoaders;

if (!this.classLoaders.isEmpty()) {
classLoaders.forEach(this.classGraph::addClassLoader);
}

this.pluginConfiguration =
pluginConfiguration == null ? PluginConfiguration.EMPTY : pluginConfiguration;
}

public PluginFactory loadPlugins() {
this.aspectPayloadValidators = buildAspectPayloadValidators(this.pluginConfiguration);
this.mutationHooks = buildMutationHooks(this.pluginConfiguration);
this.mclSideEffects = buildMCLSideEffects(this.pluginConfiguration);
this.mcpSideEffects = buildMCPSideEffects(this.pluginConfiguration);
return this;
}

/**
* Memory intensive operation because of the size of the jars. Limit packages, classes scanned,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the limit being applied?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The packages and class names are used in the acceptPackages and acceptClass

new ClassGraph()
.acceptPackages(packageNames.stream().distinct().toArray(String[]::new) .acceptClasses(classNames.stream().distinct().toArray(String[]::new))

* cache results
*
* @param configs plugin configurations
* @return auto-closeable scan result
*/
protected static <T extends PluginSpec> List<T> initPlugins(
@Nonnull List<ClassLoader> classLoaders,
@Nonnull Class<?> baseClazz,
@Nonnull List<String> packageNames,
@Nonnull List<AspectPluginConfig> configs) {

List<String> classNames =
configs.stream().map(AspectPluginConfig::getClassName).collect(Collectors.toList());

if (classNames.isEmpty()) {
return Collections.emptyList();
} else {
long key =
IntStream.concat(
classLoaders.stream().mapToInt(Object::hashCode),
IntStream.concat(
IntStream.of(baseClazz.getName().hashCode()),
configs.stream().mapToInt(AspectPluginConfig::hashCode)))
.sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so this is creating a key like
sum ( classLoaders.hashCode..., aspectPluginConfigClass.hashCode... )

Is this guaranteed to be unique?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unique within a given jvm execution. It might leak a bit when/if jars are being constantly replaced since each time that happens we'll be creating orphans. This was primarily added to help with tests where we run multiple concurrent test threads and memory usage was triggering OOM.


return (List<T>)
pluginCache.computeIfAbsent(
key,
k -> {
try {
ClassGraph classGraph =
new ClassGraph()
.acceptPackages(packageNames.stream().distinct().toArray(String[]::new))
.acceptClasses(classNames.stream().distinct().toArray(String[]::new))
.enableRemoteJarScanning()
.enableExternalClasses()
.enableClassInfo()
.enableMethodInfo();
if (!classLoaders.isEmpty()) {
classLoaders.forEach(classGraph::addClassLoader);
}

try (ScanResult scanResult = classGraph.scan()) {
Map<String, ClassInfo> classMap =
scanResult.getSubclasses(baseClazz).stream()
.collect(Collectors.toMap(ClassInfo::getName, Function.identity()));

return configs.stream()
.map(
config -> {
try {
ClassInfo classInfo = classMap.get(config.getClassName());
if (classInfo == null) {
throw new IllegalStateException(
String.format(
"The following class cannot be loaded: %s",
config.getClassName()));
}
MethodInfo constructorMethod =
classInfo.getConstructorInfo().get(0);
return ((T)
constructorMethod
.loadClassAndGetConstructor()
.newInstance())
.setConfig(config);
} catch (Exception e) {
log.error(
"Error constructing entity registry plugin class: {}",
config.getClassName(),
e);
return Stream.<T>empty();
}
})
.map(plugin -> (T) plugin)
.filter(PluginSpec::enabled)
.collect(Collectors.toList());
}
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Failed to load entity registry plugins: %s.", baseClazz.getName()),
e);
}
});
}
}

/**
Expand Down Expand Up @@ -187,68 +272,67 @@ private List<AspectPayloadValidator> buildAspectPayloadValidators(
: applyDisable(
build(
AspectPayloadValidator.class,
pluginConfiguration.getAspectPayloadValidators(),
VALIDATOR_PACKAGES));
pluginConfiguration.validatorPackages(),
pluginConfiguration.getAspectPayloadValidators()));
}

private List<MutationHook> buildMutationHooks(@Nullable PluginConfiguration pluginConfiguration) {
return pluginConfiguration == null
? Collections.emptyList()
: applyDisable(
build(MutationHook.class, pluginConfiguration.getMutationHooks(), HOOK_PACKAGES));
build(
MutationHook.class,
pluginConfiguration.mutationPackages(),
pluginConfiguration.getMutationHooks()));
}

private List<MCLSideEffect> buildMCLSideEffects(
@Nullable PluginConfiguration pluginConfiguration) {
return pluginConfiguration == null
? Collections.emptyList()
: applyDisable(
build(MCLSideEffect.class, pluginConfiguration.getMclSideEffects(), HOOK_PACKAGES));
build(
MCLSideEffect.class,
pluginConfiguration.mclSideEffectPackages(),
pluginConfiguration.getMclSideEffects()));
}

private List<MCPSideEffect> buildMCPSideEffects(
@Nullable PluginConfiguration pluginConfiguration) {
return pluginConfiguration == null
? Collections.emptyList()
: applyDisable(
build(MCPSideEffect.class, pluginConfiguration.getMcpSideEffects(), HOOK_PACKAGES));
build(
MCPSideEffect.class,
pluginConfiguration.mcpSideEffectPackages(),
pluginConfiguration.getMcpSideEffects()));
}

private <T> List<T> build(
Class<?> baseClazz, List<AspectPluginConfig> configs, String... packageNames) {
try (ScanResult scanResult = classGraph.acceptPackages(packageNames).scan()) {

Map<String, ClassInfo> classMap =
scanResult.getSubclasses(baseClazz).stream()
.collect(Collectors.toMap(ClassInfo::getName, Function.identity()));

return configs.stream()
.flatMap(
config -> {
try {
ClassInfo classInfo = classMap.get(config.getClassName());
if (classInfo == null) {
throw new IllegalStateException(
String.format(
"The following class cannot be loaded: %s", config.getClassName()));
}
MethodInfo constructorMethod = classInfo.getConstructorInfo().get(0);
return Stream.of(
(T) constructorMethod.loadClassAndGetConstructor().newInstance(config));
} catch (Exception e) {
log.error(
"Error constructing entity registry plugin class: {}",
config.getClassName(),
e);
return Stream.empty();
}
})
.collect(Collectors.toList());
/**
* Load plugins given the base class (i.e. a validator) and the name of the implementing class
* found in the configuration objects.
*
* <p>For performance reasons, scan the packages found in packageNames
*
* <p>Designed to avoid any Spring dependency, see alternative implementation for Spring
*
* @param baseClazz base class for the plugin
* @param configs configuration with implementing class information
* @param packageNames package names to scan
* @return list of plugin instances
* @param <T> the plugin class
*/
protected <T extends PluginSpec> List<T> build(
Class<?> baseClazz, List<String> packageNames, List<AspectPluginConfig> configs) {
List<AspectPluginConfig> nonSpringConfigs =
configs.stream()
.filter(
config ->
config.getSpring() == null
|| Boolean.FALSE.equals(config.getSpring().isEnabled()))
.collect(Collectors.toList());

} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to load entity registry plugins: %s.", baseClazz.getName()), e);
}
return initPlugins(classLoaders, baseClazz, packageNames, nonSpringConfigs);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
public abstract class PluginSpec {
protected static String ENTITY_WILDCARD = "*";

private final AspectPluginConfig aspectPluginConfig;
@Nonnull
public abstract AspectPluginConfig getConfig();

protected AspectPluginConfig getConfig() {
return this.aspectPluginConfig;
public abstract PluginSpec setConfig(@Nonnull AspectPluginConfig config);

public boolean enabled() {
return true;
}

public boolean shouldApply(
Expand Down
Loading
Loading