-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): add more fail-safes to stateful ingestion #8111
Conversation
Stacked on top of the changes in datahub-project#8104. Review that PR first.
ac071e1
to
ff4bad3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question but lgtm
self.job_id, self.state_type_class | ||
) | ||
if not last_checkpoint: | ||
return | ||
cur_checkpoint = self.state_provider.get_current_checkpoint(self.job_id) | ||
assert cur_checkpoint is not None | ||
# Get the underlying states | ||
last_checkpoint_state = cast(GenericCheckpointState, last_checkpoint.state) | ||
last_checkpoint_state: GenericCheckpointState = last_checkpoint.state | ||
cur_checkpoint_state = cast(GenericCheckpointState, cur_checkpoint.state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove the cast here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope - get_current_checkpoint doesn't infer types in the same way
|
||
# If the source already had a failure, skip soft-deletion. | ||
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far. | ||
if self.source.get_report().failures: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this take into account the commit_policy
? Or for stale entity removal the rule is just commit iff there are no failures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the latter
in the commit policy rules should be tied to each stateful ingestion handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that mean we're dropping support for a configurable commit policy? Right now it seems like it doesn't really do anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually yes I'd like to drop support for that
Stacked on top of the changes in #8104. Review that PR first.
Checklist