Skip to content

Commit

Permalink
feat(bootstrap): add bootstrap step to clear out unknown aspect rows …
Browse files Browse the repository at this point in the history
…from the database (#5148)
  • Loading branch information
RyanHolstien authored Jun 21, 2022
1 parent 4c33f2d commit ba21efc
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade;
import com.linkedin.datahub.upgrade.nocodecleanup.NoCodeCleanupUpgrade;
import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects;
import com.linkedin.datahub.upgrade.restorebackup.RestoreBackup;
import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
import java.util.List;
Expand Down Expand Up @@ -44,12 +45,17 @@ private static final class Args {
@Named("restoreBackup")
private RestoreBackup restoreBackup;

@Inject
@Named("removeUnknownAspects")
private RemoveUnknownAspects removeUnknownAspects;

@Override
public void run(String... cmdLineArgs) {
_upgradeManager.register(noCodeUpgrade);
_upgradeManager.register(noCodeCleanup);
_upgradeManager.register(restoreIndices);
_upgradeManager.register(restoreBackup);
_upgradeManager.register(removeUnknownAspects);

final Args args = new Args();
new CommandLine(args).setCaseInsensitiveEnumValuesAllowed(true).parseArgs(cmdLineArgs);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects;
import com.linkedin.metadata.entity.EntityService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RemoveUnknownAspectsConfig {
@Bean(name = "removeUnknownAspects")
public RemoveUnknownAspects removeUnknownAspects(EntityService entityService) {
return new RemoveUnknownAspects(entityService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.linkedin.datahub.upgrade.removeunknownaspects;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.gms.factory.telemetry.TelemetryUtils;
import com.linkedin.metadata.entity.EntityService;
import java.util.HashMap;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@RequiredArgsConstructor
public class RemoveClientIdAspectStep implements UpgradeStep {

private static final String INVALID_CLIENT_ID_ASPECT = "clientId";

private final EntityService _entityService;

@Override
public String id() {
return this.getClass().getSimpleName();
}

@Override
public boolean skip(UpgradeContext context) {
return false;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return upgradeContext -> {
_entityService.deleteAspect(TelemetryUtils.CLIENT_ID_URN, INVALID_CLIENT_ID_ASPECT,
new HashMap<>(), true);
return (UpgradeStepResult) new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.datahub.upgrade.removeunknownaspects;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import java.util.ArrayList;
import java.util.List;


public class RemoveUnknownAspects implements Upgrade {

private final List<UpgradeStep> _steps;

public RemoveUnknownAspects(final EntityService entityService) {
_steps = buildSteps(entityService);
}

@Override
public String id() {
return this.getClass().getSimpleName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}

private List<UpgradeStep> buildSteps(final EntityService entityService) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new RemoveClientIdAspectStep(entityService));
return steps;
}

@Override
public List<UpgradeCleanupStep> cleanupSteps() {
return ImmutableList.of();
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
package com.linkedin.metadata.entity;

import com.linkedin.metadata.entity.cassandra.CassandraAspect;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import javax.annotation.Nonnull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;

/**
* This class holds values required to construct a unique key to identify an entity aspect record in a database.
* Its existence started mainly for compatibility with {@link com.linkedin.metadata.entity.ebean.EbeanAspectV2.PrimaryKey}
*/
@Value
@Slf4j
public class EntityAspectIdentifier {
@Nonnull String urn;
@Nonnull String aspect;
long version;

public static EntityAspectIdentifier fromEbean(EbeanAspectV2 ebeanAspectV2) {
return new EntityAspectIdentifier(ebeanAspectV2.getUrn(), ebeanAspectV2.getAspect(), ebeanAspectV2.getVersion());
}

public static EntityAspectIdentifier fromCassandra(CassandraAspect cassandraAspect) {
return new EntityAspectIdentifier(cassandraAspect.getUrn(), cassandraAspect.getAspect(), cassandraAspect.getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.codahale.metrics.Timer;
import com.datahub.util.RecordUtils;
import com.datahub.util.exception.ModelConversionException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -44,15 +43,10 @@
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.ArrayList;
Expand All @@ -67,11 +61,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema;
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.utils.PegasusUtils.*;


/**
Expand Down Expand Up @@ -1315,16 +1311,14 @@ public Boolean exists(Urn urn) {
}

@Nullable
public RollbackResult deleteAspect(String urn, String aspectName, Map<String, String> conditions, boolean hardDelete) {
public RollbackResult deleteAspect(String urn, String aspectName, @Nonnull Map<String, String> conditions, boolean hardDelete) {
// Validate pre-conditions before running queries
Urn entityUrn;
EntitySpec entitySpec;
try {
entityUrn = Urn.createFromString(urn);
String entityName = PegasusUtils.urnToEntityName(entityUrn);
entitySpec = getEntityRegistry().getEntitySpec(entityName);
Preconditions.checkState(entitySpec != null, String.format("Could not find entity definition for %s", entityName));
Preconditions.checkState(entitySpec.hasAspect(aspectName), String.format("Could not find aspect %s in definition for %s", aspectName, entityName));
} catch (URISyntaxException uriSyntaxException) {
// don't expect this to happen, so raising RuntimeException here
throw new RuntimeException(String.format("Failed to extract urn from %s", urn));
Expand Down Expand Up @@ -1353,7 +1347,7 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St
try {
isKeyAspect = getKeyAspectName(Urn.createFromString(urn)).equals(aspectName);
} catch (URISyntaxException e) {
e.printStackTrace();
log.error("Error occurred while parsing urn: {}", urn, e);
}

// 4. Fetch all preceding aspects, that match
Expand Down Expand Up @@ -1437,6 +1431,9 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St
survivingAspect == null ? ChangeType.DELETE : ChangeType.UPSERT, isKeyAspect, additionalRowsDeleted);
} catch (URISyntaxException e) {
throw new RuntimeException(String.format("Failed to emit the update for urn %s", urn));
} catch (IllegalStateException e) {
log.warn("Unable to find aspect, rollback result will not be sent. Error: {}", e.getMessage());
return null;
}
}, DEFAULT_MAX_TRANSACTION_RETRY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.AspectMigrationsDao;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.entity.EntityAspectIdentifier;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
Expand All @@ -24,12 +24,6 @@
import io.ebean.Transaction;
import io.ebean.TxScope;
import io.ebean.annotation.TxIsolation;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.RollbackException;
import javax.persistence.Table;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.time.Clock;
Expand All @@ -41,8 +35,13 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.RollbackException;
import javax.persistence.Table;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.linkedin.metadata.Constants.*;

@Slf4j
public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
Expand Down Expand Up @@ -233,15 +232,13 @@ public EntityAspect getAspect(@Nonnull final EntityAspectIdentifier key) {
}

@Override
@Nullable
public void deleteAspect(@Nonnull final EntityAspect aspect) {
validateConnection();
EbeanAspectV2 ebeanAspect = EbeanAspectV2.fromEntityAspect(aspect);
_server.delete(ebeanAspect);
}

@Override
@Nullable
public int deleteUrn(@Nonnull final String urn) {
validateConnection();
return _server.createQuery(EbeanAspectV2.class).where().eq(EbeanAspectV2.URN_COLUMN, urn).delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void start() {
try {
step.execute();
} catch (Exception e) {
log.error(String.format("Caught exception while executing bootstrap step %s. Exiting...", step.name()), e);
log.error(String.format("Caught exception while executing bootstrap step %s. Continuing...", step.name()), e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@
import com.linkedin.metadata.boot.steps.IngestPoliciesStep;
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
import com.linkedin.metadata.boot.steps.IngestRootUserStep;
import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep;
import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices;
import com.linkedin.metadata.entity.AspectMigrationsDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Scope;

import javax.annotation.Nonnull;


@Configuration
@Import({EntityServiceFactory.class, EntityRegistryFactory.class, EntitySearchServiceFactory.class,
Expand Down Expand Up @@ -67,7 +67,8 @@ protected BootstrapManager createInstance() {
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
new IngestDataPlatformInstancesStep(_entityService, _migrationsDao);
final RestoreGlossaryIndices restoreGlossaryIndicesStep = new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry);
final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService);
return new BootstrapManager(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep,
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep));
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep));
}
}
Loading

0 comments on commit ba21efc

Please sign in to comment.