Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
llance authored Jul 31, 2024
2 parents 5c4211c + 4512209 commit 9e9b79a
Show file tree
Hide file tree
Showing 97 changed files with 5,398 additions and 274 deletions.
31 changes: 23 additions & 8 deletions .github/scripts/check_python_package.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
import setuptools
import os

folders = ["./smoke-test/tests"]

for folder in folders:
print(f"Checking folder {folder}")
a = [i for i in setuptools.find_packages(folder) if "cypress" not in i]
b = [i for i in setuptools.find_namespace_packages(folder) if "cypress" not in i]
packages = [i for i in setuptools.find_packages(folder) if "cypress" not in i]
namespace_packages = [
i for i in setuptools.find_namespace_packages(folder) if "cypress" not in i
]

in_a_not_b = set(a) - set(b)
in_b_not_a = set(b) - set(a)
print("Packages found:", packages)
print("Namespace packages found:", namespace_packages)

in_packages_not_namespace = set(packages) - set(namespace_packages)
in_namespace_not_packages = set(namespace_packages) - set(packages)

if in_packages_not_namespace:
print(f"Packages not in namespace packages: {in_packages_not_namespace}")
if in_namespace_not_packages:
print(f"Namespace packages not in packages: {in_namespace_not_packages}")
for pkg in in_namespace_not_packages:
pkg_path = os.path.join(folder, pkg.replace(".", os.path.sep))
print(f"Contents of {pkg_path}:")
print(os.listdir(pkg_path))

assert (
len(in_a_not_b) == 0
), f"Found packages in {folder} that are not in namespace packages: {in_a_not_b}"
len(in_packages_not_namespace) == 0
), f"Found packages in {folder} that are not in namespace packages: {in_packages_not_namespace}"
assert (
len(in_b_not_a) == 0
), f"Found namespace packages in {folder} that are not in packages: {in_b_not_a}"
len(in_namespace_not_packages) == 0
), f"Found namespace packages in {folder} that are not in packages: {in_namespace_not_packages}"
2 changes: 1 addition & 1 deletion .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
extra_pip_requirements: 'apache-airflow~=2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt'
extra_pip_extras: plugin-v2
- python-version: "3.11"
extra_pip_requirements: 'apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.10.txt'
extra_pip_requirements: 'apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt'
extra_pip_extras: plugin-v2
fail-fast: false
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.InputField;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaFieldMapper;
import java.net.URISyntaxException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class InputFieldsMapper {

public static final InputFieldsMapper INSTANCE = new InputFieldsMapper();
Expand All @@ -31,13 +34,24 @@ public com.linkedin.datahub.graphql.generated.InputFields apply(
.map(
field -> {
InputField fieldResult = new InputField();
Urn parentUrn = entityUrn;

if (field.hasSchemaField()) {
fieldResult.setSchemaField(
SchemaFieldMapper.map(context, field.getSchemaField(), entityUrn));
}
if (field.hasSchemaFieldUrn()) {
fieldResult.setSchemaFieldUrn(field.getSchemaFieldUrn().toString());
try {
parentUrn =
Urn.createFromString(field.getSchemaFieldUrn().getEntityKey().get(0));
} catch (URISyntaxException e) {
log.error(
"Field urn resolution: failed to extract parentUrn successfully from {}. Falling back to {}",
field.getSchemaFieldUrn(),
entityUrn,
e);
}
}
if (field.hasSchemaField()) {
fieldResult.setSchemaField(
SchemaFieldMapper.map(context, field.getSchemaField(), parentUrn));
}
return fieldResult;
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade.common.steps;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import java.util.function.Function;

public class ClearSystemMetadataServiceStep implements UpgradeStep {

private final SystemMetadataService _systemMetadataService;
private final boolean _alwaysRun;

public ClearSystemMetadataServiceStep(
final SystemMetadataService systemMetadataService, final boolean alwaysRun) {
_systemMetadataService = systemMetadataService;
_alwaysRun = alwaysRun;
}

@Override
public String id() {
return "ClearSystemMetadataServiceStep";
}

@Override
public boolean skip(UpgradeContext context) {
if (_alwaysRun) {
return false;
}
if (context.parsedArgs().containsKey(NoCodeUpgrade.CLEAN_ARG_NAME)) {
return false;
}
context.report().addLine("Cleanup has not been requested.");
return true;
}

@Override
public int retryCount() {
return 1;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
_systemMetadataService.clear();
} catch (Exception e) {
context.report().addLine("Failed to clear system metadata service", e);
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@RequiredArgsConstructor
public class GMSDisableWriteModeStep implements UpgradeStep {

private final SystemEntityClient entityClient;
private final SystemEntityClient systemEntityClient;

@Override
public String id() {
Expand All @@ -29,7 +29,7 @@ public int retryCount() {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
entityClient.setWritable(context.opContext(), false);
systemEntityClient.setWritable(context.opContext(), false);
} catch (Exception e) {
log.error("Failed to turn write mode off in GMS", e);
context.report().addLine("Failed to turn write mode off in GMS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@Slf4j
@RequiredArgsConstructor
public class GMSEnableWriteModeStep implements UpgradeStep {
private final SystemEntityClient entityClient;
private final SystemEntityClient systemEntityClient;

@Override
public String id() {
Expand All @@ -28,7 +28,7 @@ public int retryCount() {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
entityClient.setWritable(context.opContext(), true);
systemEntityClient.setWritable(context.opContext(), true);
} catch (Exception e) {
log.error("Failed to turn write mode back on in GMS", e);
context.report().addLine("Failed to turn write mode back on in GMS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,29 +27,40 @@ public class RestoreBackupConfig {
"ebeanServer",
"entityService",
"systemEntityClient",
"graphService",
"systemMetadataService",
"searchService",
"graphService",
"entityRegistry"
})
@ConditionalOnProperty(name = "entityService.impl", havingValue = "ebean", matchIfMissing = true)
@Nonnull
public RestoreBackup createInstance() {
final Database ebeanServer = applicationContext.getBean(Database.class);
final EntityService<?> entityService = applicationContext.getBean(EntityService.class);
final SystemEntityClient entityClient = applicationContext.getBean(SystemEntityClient.class);
final GraphService graphClient = applicationContext.getBean(GraphService.class);
final EntitySearchService searchClient = applicationContext.getBean(EntitySearchService.class);
final SystemEntityClient systemEntityClient =
applicationContext.getBean(SystemEntityClient.class);
final SystemMetadataService systemMetadataService =
applicationContext.getBean(SystemMetadataService.class);
final EntitySearchService entitySearchService =
applicationContext.getBean(EntitySearchService.class);
final GraphService graphService = applicationContext.getBean(GraphService.class);
final EntityRegistry entityRegistry = applicationContext.getBean(EntityRegistry.class);

return new RestoreBackup(
ebeanServer, entityService, entityRegistry, entityClient, graphClient, searchClient);
ebeanServer,
entityService,
entityRegistry,
systemEntityClient,
systemMetadataService,
entitySearchService,
graphService);
}

@Bean(name = "restoreBackup")
@ConditionalOnProperty(name = "entityService.impl", havingValue = "cassandra")
@Nonnull
public RestoreBackup createNotImplInstance() {
log.warn("restoreIndices is not supported for cassandra!");
return new RestoreBackup(null, null, null, null, null, null);
return new RestoreBackup(null, null, null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -20,24 +21,33 @@ public class RestoreIndicesConfig {
@Autowired ApplicationContext applicationContext;

@Bean(name = "restoreIndices")
@DependsOn({"ebeanServer", "entityService", "searchService", "graphService"})
@DependsOn({
"ebeanServer",
"entityService",
"systemMetadataService",
"searchService",
"graphService"
})
@ConditionalOnProperty(name = "entityService.impl", havingValue = "ebean", matchIfMissing = true)
@Nonnull
public RestoreIndices createInstance() {
final Database ebeanServer = applicationContext.getBean(Database.class);
final EntityService<?> entityService = applicationContext.getBean(EntityService.class);
final SystemMetadataService systemMetadataService =
applicationContext.getBean(SystemMetadataService.class);
final EntitySearchService entitySearchService =
applicationContext.getBean(EntitySearchService.class);
final GraphService graphService = applicationContext.getBean(GraphService.class);

return new RestoreIndices(ebeanServer, entityService, entitySearchService, graphService);
return new RestoreIndices(
ebeanServer, entityService, systemMetadataService, entitySearchService, graphService);
}

@Bean(name = "restoreIndices")
@ConditionalOnProperty(name = "entityService.impl", havingValue = "cassandra")
@Nonnull
public RestoreIndices createNotImplInstance() {
log.warn("restoreIndices is not supported for cassandra!");
return new RestoreIndices(null, null, null, null);
return new RestoreIndices(null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSystemMetadataServiceStep;
import com.linkedin.datahub.upgrade.common.steps.GMSDisableWriteModeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,13 +28,20 @@ public RestoreBackup(
@Nullable final Database server,
final EntityService<?> entityService,
final EntityRegistry entityRegistry,
final SystemEntityClient entityClient,
final GraphService graphClient,
final EntitySearchService searchClient) {
final SystemEntityClient systemEntityClient,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphClient) {
if (server != null) {
_steps =
buildSteps(
server, entityService, entityRegistry, entityClient, graphClient, searchClient);
server,
entityService,
entityRegistry,
systemEntityClient,
systemMetadataService,
entitySearchService,
graphClient);
} else {
_steps = List.of();
}
Expand All @@ -52,16 +61,18 @@ private List<UpgradeStep> buildSteps(
final Database server,
final EntityService<?> entityService,
final EntityRegistry entityRegistry,
final SystemEntityClient entityClient,
final GraphService graphClient,
final EntitySearchService searchClient) {
final SystemEntityClient systemEntityClient,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphClient) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new GMSDisableWriteModeStep(entityClient));
steps.add(new ClearSearchServiceStep(searchClient, true));
steps.add(new GMSDisableWriteModeStep(systemEntityClient));
steps.add(new ClearSystemMetadataServiceStep(systemMetadataService, true));
steps.add(new ClearSearchServiceStep(entitySearchService, true));
steps.add(new ClearGraphServiceStep(graphClient, true));
steps.add(new ClearAspectV2TableStep(server));
steps.add(new RestoreStorageStep(entityService, entityRegistry));
steps.add(new GMSEnableWriteModeStep(entityClient));
steps.add(new GMSEnableWriteModeStep(systemEntityClient));
return steps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSystemMetadataServiceStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,10 +34,13 @@ public class RestoreIndices implements Upgrade {
public RestoreIndices(
@Nullable final Database server,
final EntityService<?> entityService,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphService) {
if (server != null) {
_steps = buildSteps(server, entityService, entitySearchService, graphService);
_steps =
buildSteps(
server, entityService, systemMetadataService, entitySearchService, graphService);
} else {
_steps = List.of();
}
Expand All @@ -54,9 +59,11 @@ public List<UpgradeStep> steps() {
private List<UpgradeStep> buildSteps(
final Database server,
final EntityService<?> entityService,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphService) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new ClearSystemMetadataServiceStep(systemMetadataService, false));
steps.add(new ClearSearchServiceStep(entitySearchService, false));
steps.add(new ClearGraphServiceStep(graphService, false));
steps.add(new SendMAEStep(server, entityService));
Expand Down
Loading

0 comments on commit 9e9b79a

Please sign in to comment.