Skip to content

Commit

Permalink
feat(ingest): tweak stale entity removal messaging (#11064)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Aug 6, 2024
1 parent 0400785 commit 832093a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def get_percent_entities_changed(
new_entities=self.urns, old_entities=old_checkpoint_state.urns
)

def urn_count(self) -> int:
return len(self.urns)


def compute_percent_entities_changed(
new_entities: List[str], old_entities: List[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
description="Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=40.0,
default=75.0,
description="Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
le=100.0,
ge=0.0,
Expand Down Expand Up @@ -257,13 +257,33 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:

assert self.stateful_ingestion_config

copy_previous_state_and_fail = False
copy_previous_state_and_exit = False

# If the source already had a failure, skip soft-deletion.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far, not just the source.
if self.source.get_report().failures:
self.source.get_report().report_warning(
title="Skipping stateful ingestion / stale entity removal",
message="The soft-deletion of stale entities will be skipped because the source reported a failure.",
)
copy_previous_state_and_exit = True

if (
not copy_previous_state_and_exit
and self.source.get_report().events_produced == 0
):
self.source.get_report().report_failure(
title="Skipping stateful ingestion / stale entity removal",
message="The source did not produce any metadata. Despite stateful ingestion being enabled, we will not delete any metadata. "
"This is a fail-safe mechanism to prevent the accidental deletion of all entities.",
)
copy_previous_state_and_exit = True

# Check if the entity delta is below the fail-safe threshold.
entity_difference_percent = cur_checkpoint_state.get_percent_entities_changed(
last_checkpoint_state
)
if (
if not copy_previous_state_and_exit and (
entity_difference_percent
> self.stateful_ingestion_config.fail_safe_threshold
# Adding this check to protect against cases where get_percent_entities_changed returns over 100%.
Expand All @@ -273,30 +293,21 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
):
# Log the failure. This would prevent the current state from getting committed.
self.source.get_report().report_failure(
"stale-entity-removal",
f"Will not soft-delete entities, since we'd be deleting {entity_difference_percent:.1f}% of the existing entities. "
f"To force a deletion, increase the value of 'stateful_ingestion.fail_safe_threshold' (currently {self.stateful_ingestion_config.fail_safe_threshold})",
)
copy_previous_state_and_fail = True

if self.source.get_report().events_produced == 0:
self.source.get_report().report_failure(
"stale-entity-removal",
"Skipping stale entity soft-deletion because the source produced no events. "
"This is a fail-safe mechanism to prevent accidental deletion of all entities.",
)
copy_previous_state_and_fail = True

# If the source already had a failure, skip soft-deletion.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far, not just the source.
if self.source.get_report().failures:
self.source.get_report().report_warning(
"stale-entity-removal",
"Skipping stale entity soft-deletion and copying urns from last state since source already had failures.",
title="Skipping stateful ingestion / stale entity removal",
message=f"\
The previous run produced {last_checkpoint_state.urn_count()} entities, whereas this run produced {cur_checkpoint_state.urn_count()} entities. \
Comparing the entities produced this run vs the previous run, we would be deleting {entity_difference_percent:.1f}% of the entities produced by the previous run. \
This percentage is above the threshold (currently {self.stateful_ingestion_config.fail_safe_threshold}), so we will skip soft-deleting stale entities.\
\
To update this threshold, add this to your recipe: \
\
stateful_ingestion:\
fail_safe_threshold: <new value>\
",
)
copy_previous_state_and_fail = True
copy_previous_state_and_exit = True

if copy_previous_state_and_fail:
if copy_previous_state_and_exit:
logger.info(
f"Copying urns from last state (size {len(last_checkpoint_state.urns)}) to current state (size {len(cur_checkpoint_state.urns)}) "
"to ensure stale entities from previous runs are deleted on the next successful run."
Expand Down

0 comments on commit 832093a

Please sign in to comment.