-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bootstrap): add bootstrap step to clear out unknown aspect rows from the database #5148
Changes from all commits
2e0c703
503023c
7adcdcd
8737c90
e943ebd
c2824d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package com.linkedin.datahub.upgrade.config; | ||
|
||
import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects; | ||
import com.linkedin.metadata.entity.EntityService; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
|
||
@Configuration | ||
public class RemoveUnknownAspectsConfig { | ||
@Bean(name = "removeUnknownAspects") | ||
public RemoveUnknownAspects removeUnknownAspects(EntityService entityService) { | ||
return new RemoveUnknownAspects(entityService); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package com.linkedin.datahub.upgrade.removeunknownaspects; | ||
|
||
import com.linkedin.datahub.upgrade.UpgradeContext; | ||
import com.linkedin.datahub.upgrade.UpgradeStep; | ||
import com.linkedin.datahub.upgrade.UpgradeStepResult; | ||
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; | ||
import com.linkedin.gms.factory.telemetry.TelemetryUtils; | ||
import com.linkedin.metadata.entity.EntityService; | ||
import java.util.HashMap; | ||
import java.util.function.Function; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
|
||
@Slf4j | ||
@RequiredArgsConstructor | ||
public class RemoveClientIdAspectStep implements UpgradeStep { | ||
|
||
private static final String INVALID_CLIENT_ID_ASPECT = "clientId"; | ||
|
||
private final EntityService _entityService; | ||
|
||
@Override | ||
public String id() { | ||
return this.getClass().getSimpleName(); | ||
} | ||
|
||
@Override | ||
public boolean skip(UpgradeContext context) { | ||
return false; | ||
} | ||
|
||
@Override | ||
public Function<UpgradeContext, UpgradeStepResult> executable() { | ||
return upgradeContext -> { | ||
_entityService.deleteAspect(TelemetryUtils.CLIENT_ID_URN, INVALID_CLIENT_ID_ASPECT, | ||
new HashMap<>(), true); | ||
return (UpgradeStepResult) new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package com.linkedin.datahub.upgrade.removeunknownaspects; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.linkedin.datahub.upgrade.Upgrade; | ||
import com.linkedin.datahub.upgrade.UpgradeCleanupStep; | ||
import com.linkedin.datahub.upgrade.UpgradeStep; | ||
import com.linkedin.metadata.entity.EntityService; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
|
||
public class RemoveUnknownAspects implements Upgrade { | ||
|
||
private final List<UpgradeStep> _steps; | ||
|
||
public RemoveUnknownAspects(final EntityService entityService) { | ||
_steps = buildSteps(entityService); | ||
} | ||
|
||
@Override | ||
public String id() { | ||
return this.getClass().getSimpleName(); | ||
} | ||
|
||
@Override | ||
public List<UpgradeStep> steps() { | ||
return _steps; | ||
} | ||
|
||
private List<UpgradeStep> buildSteps(final EntityService entityService) { | ||
final List<UpgradeStep> steps = new ArrayList<>(); | ||
steps.add(new RemoveClientIdAspectStep(entityService)); | ||
return steps; | ||
} | ||
|
||
@Override | ||
public List<UpgradeCleanupStep> cleanupSteps() { | ||
return ImmutableList.of(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,28 @@ | ||
package com.linkedin.metadata.entity; | ||
|
||
import com.linkedin.metadata.entity.cassandra.CassandraAspect; | ||
import com.linkedin.metadata.entity.ebean.EbeanAspectV2; | ||
import javax.annotation.Nonnull; | ||
import lombok.Value; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import javax.annotation.Nonnull; | ||
|
||
/** | ||
* This class holds values required to construct a unique key to identify an entity aspect record in a database. | ||
* Its existence started mainly for compatibility with {@link com.linkedin.metadata.entity.ebean.EbeanAspectV2.PrimaryKey} | ||
*/ | ||
@Value | ||
@Slf4j | ||
public class EntityAspectIdentifier { | ||
@Nonnull String urn; | ||
@Nonnull String aspect; | ||
long version; | ||
|
||
public static EntityAspectIdentifier fromEbean(EbeanAspectV2 ebeanAspectV2) { | ||
return new EntityAspectIdentifier(ebeanAspectV2.getUrn(), ebeanAspectV2.getAspect(), ebeanAspectV2.getVersion()); | ||
} | ||
|
||
public static EntityAspectIdentifier fromCassandra(CassandraAspect cassandraAspect) { | ||
return new EntityAspectIdentifier(cassandraAspect.getUrn(), cassandraAspect.getAspect(), cassandraAspect.getVersion()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ | |
import com.codahale.metrics.Timer; | ||
import com.datahub.util.RecordUtils; | ||
import com.datahub.util.exception.ModelConversionException; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.ImmutableSet; | ||
import com.google.common.collect.Iterators; | ||
|
@@ -44,15 +43,10 @@ | |
import com.linkedin.metadata.utils.PegasusUtils; | ||
import com.linkedin.metadata.utils.metrics.MetricUtils; | ||
import com.linkedin.mxe.MetadataAuditOperation; | ||
import com.linkedin.mxe.MetadataChangeProposal; | ||
import com.linkedin.mxe.MetadataChangeLog; | ||
import com.linkedin.mxe.MetadataChangeProposal; | ||
import com.linkedin.mxe.SystemMetadata; | ||
import com.linkedin.util.Pair; | ||
import lombok.Value; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import java.net.URISyntaxException; | ||
import java.sql.Timestamp; | ||
import java.util.ArrayList; | ||
|
@@ -67,11 +61,13 @@ | |
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import lombok.Value; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION; | ||
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR; | ||
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema; | ||
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName; | ||
import static com.linkedin.metadata.Constants.*; | ||
import static com.linkedin.metadata.utils.PegasusUtils.*; | ||
|
||
|
||
/** | ||
|
@@ -1315,16 +1311,14 @@ public Boolean exists(Urn urn) { | |
} | ||
|
||
@Nullable | ||
public RollbackResult deleteAspect(String urn, String aspectName, Map<String, String> conditions, boolean hardDelete) { | ||
public RollbackResult deleteAspect(String urn, String aspectName, @Nonnull Map<String, String> conditions, boolean hardDelete) { | ||
// Validate pre-conditions before running queries | ||
Urn entityUrn; | ||
EntitySpec entitySpec; | ||
try { | ||
entityUrn = Urn.createFromString(urn); | ||
String entityName = PegasusUtils.urnToEntityName(entityUrn); | ||
entitySpec = getEntityRegistry().getEntitySpec(entityName); | ||
Preconditions.checkState(entitySpec != null, String.format("Could not find entity definition for %s", entityName)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why remove these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of them doesn't actually do anything since it will never be triggered and the other one prevents the delete from going through. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would it prevent the delete? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we always populate the "aspectName" field? |
||
Preconditions.checkState(entitySpec.hasAspect(aspectName), String.format("Could not find aspect %s in definition for %s", aspectName, entityName)); | ||
} catch (URISyntaxException uriSyntaxException) { | ||
// don't expect this to happen, so raising RuntimeException here | ||
throw new RuntimeException(String.format("Failed to extract urn from %s", urn)); | ||
|
@@ -1353,7 +1347,7 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St | |
try { | ||
isKeyAspect = getKeyAspectName(Urn.createFromString(urn)).equals(aspectName); | ||
} catch (URISyntaxException e) { | ||
e.printStackTrace(); | ||
log.error("Error occurred while parsing urn: {}", urn, e); | ||
} | ||
|
||
// 4. Fetch all preceding aspects, that match | ||
|
@@ -1437,6 +1431,9 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St | |
survivingAspect == null ? ChangeType.DELETE : ChangeType.UPSERT, isKeyAspect, additionalRowsDeleted); | ||
} catch (URISyntaxException e) { | ||
throw new RuntimeException(String.format("Failed to emit the update for urn %s", urn)); | ||
} catch (IllegalStateException e) { | ||
log.warn("Unable to find aspect, rollback result will not be sent. Error: {}", e.getMessage()); | ||
RyanHolstien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return null; | ||
} | ||
}, DEFAULT_MAX_TRANSACTION_RETRY); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really dislike the naming inconsistency of
EbeanAspectV2
andCassandraAspect
:( Seems very strange that this class needs to know about specific DB implementations, also.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a utility conversion from the lower level aspect, made more sense here to me tied to the actual higher level object it will always be used with than in a separate utility class that will never be found by others trying to do the same thing.