Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): enable stateful ingestion safety threshold #10516

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/cli/state_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ def inspect(pipeline_name: str, platform: str) -> None:
click.secho("No ingestion state found.", fg="red")
exit(1)

logger.info(f"Found ingestion state with {len(checkpoint.state.urns)} URNs.")
click.echo(json.dumps(checkpoint.state.urns, indent=2))
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,7 @@ def create_target_platform_mces(
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=upstreams_lineage_class,
).as_workunit()
).as_workunit(is_primary_source=False)

def extract_query_tag_aspects(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
description="Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=100.0,
default=40.0,
description="Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
le=100.0,
ge=0.0,
Expand Down Expand Up @@ -224,6 +224,8 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:

assert self.stateful_ingestion_config

copy_previous_state_and_fail = False

# Check if the entity delta is below the fail-safe threshold.
entity_difference_percent = cur_checkpoint_state.get_percent_entities_changed(
last_checkpoint_state
Expand All @@ -242,30 +244,32 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
f"Will not soft-delete entities, since we'd be deleting {entity_difference_percent:.1f}% of the existing entities. "
f"To force a deletion, increase the value of 'stateful_ingestion.fail_safe_threshold' (currently {self.stateful_ingestion_config.fail_safe_threshold})",
)
return
copy_previous_state_and_fail = True

if self.source.get_report().events_produced == 0:
# SUBTLE: By reporting this as a failure here, we also ensure that the
# new (empty) state doesn't get committed.
# TODO: Move back to using fail_safe_threshold once we're confident that we've squashed all the bugs.
self.source.get_report().report_failure(
"stale-entity-removal",
"Skipping stale entity soft-deletion because the source produced no events. "
"This is a fail-safe mechanism to prevent accidental deletion of all entities.",
)
return
copy_previous_state_and_fail = True

# If the source already had a failure, skip soft-deletion.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far, not just the source.
if self.source.get_report().failures:
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
self.add_entity_to_state("", urn)
self.source.get_report().report_warning(
"stale-entity-removal",
"Skipping stale entity soft-deletion and coping urns from last state since source already had failures.",
"Skipping stale entity soft-deletion and copying urns from last state since source already had failures.",
)
copy_previous_state_and_fail = True

if copy_previous_state_and_fail:
logger.info(
f"Copying urns from last state (size {last_checkpoint_state.urns}) to current state (size {cur_checkpoint_state.urns}) "
"to ensure stale entities from previous runs are deleted on the next successful run."
)
for urn in last_checkpoint_state.urns:
self.add_entity_to_state("", urn)
return

# Everything looks good, emit the soft-deletion workunits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4258,54 +4258,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-non-incremental-lineage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-non-incremental-lineage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-non-incremental-lineage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:dbt:column_tag",
Expand Down
Loading