-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): Create zero usage aspects #8205
feat(ingest): Create zero usage aspects #8205
Conversation
|
||
resource: str = f"{event.database}.{event.schema_}.{event.table}".lower() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lower()
was previously called in _make_usage_stat
@@ -23,7 +23,6 @@ class UnityCatalogReport(StaleEntityRemovalSourceReport): | |||
num_queries_parsed_by_spark_plan: int = 0 | |||
|
|||
num_operational_stats_workunits_emitted: int = 0 | |||
num_usage_workunits_emitted: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant with workunit reporter
@@ -294,17 +283,6 @@ | |||
}, | |||
"nativeDataType": "VARCHAR(255)", | |||
"recursive": false, | |||
"glossaryTerms": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why these are coming up on my local machine, but not failing on CI... @mayurinehate any clue here? This started failing locally since ac06cf3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try creating fresh venv once (or use acryl-datahub-classify==0.0.8)? If that doesn't work, this needs to be fixed in code, maybe by increasing number of sample values generated in mock_sample_values.return_value
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Didn't realize classify code was in a separate package
*, | ||
dataset_urns: Set[str], | ||
config: BaseTimeWindowConfig, | ||
all_buckets: bool = False, # TODO: Enable when CREATE changeType is supported for timeseries aspects |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whenever we enable this, we should check start_time, end_time defaults in config more carefully. Especially concerned that we may miss out on some usage for the bucket if the UPSERT is disabled.
With current defaults(without stateful ingestion). For today, entire usage until the ingestion time is emitted in today's run and it gets overwritten when usage runs again tomorrow with entire yesterday window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this seems like a pretty serious issue. I guess best solution here is to set start and end time correctly, to the start and end time of the nearest bucket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Some minor suggestions.
metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py
Outdated
Show resolved
Hide resolved
@@ -368,12 +373,6 @@ def _get_operation_aspect_work_unit( | |||
) | |||
continue | |||
|
|||
dataset_urn = make_dataset_urn_with_platform_instance( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for cleaning this up.
): | ||
self.report.num_usage_workunits_emitted += 1 | ||
yield wu | ||
yield from auto_empty_dataset_usage_statistics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noticed that the call to auto_empty_dataset_usage_statistics
is in get_usage_workunits
in some sources whereas in _get_workunits_internal
in some sources. Would be good to keep it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to inconsistency in how the include_usage_stats
param is interpreted. For snowflake, if include_usage_stats
is false you can still ingest operational stats, while for bigquery, redshift, and unity, if include_usage_statistics
is false then we don't ingest usage at all. We should standardize here, but idk which is best
@@ -294,17 +283,6 @@ | |||
}, | |||
"nativeDataType": "VARCHAR(255)", | |||
"recursive": false, | |||
"glossaryTerms": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try creating fresh venv once (or use acryl-datahub-classify==0.0.8)? If that doesn't work, this needs to be fixed in code, maybe by increasing number of sample values generated in mock_sample_values.return_value
function.
assert list(workunits) == [ | ||
workunits = usage_extractor._get_workunits_internal( | ||
events, [TABLE_REFS[TABLE_1.name]] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why this changed from TABLE_REFS.values()
to [TABLE_REFS[TABLE_1.name]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refs are used to determine which entities need zero usage, and I wanted to keep this test simple (i.e. smaller expected
)
|
||
urn = wu.get_urn() | ||
if guess_entity_type(urn) == DatasetUrn.ENTITY_TYPE: | ||
dataset_urns.add(urn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are already passing a well-populated set of dataset_urns
to this helper function from our sources. This line again adds all the dataset urns - kind of redundant but a safety net against what connector may have missed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this was left in back when I was not passing them in (before I realized this was not guaranteed, e.g. when include_technical_schema
is False for snowflake). I think it's pretty harmless to keep and safety net feature may be nice
…gquery.py Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
…ift.py Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good
"""Returns list of timestamps for each DatasetUsageStatistics bucket. | ||
|
||
Includes all buckets in the time window, including partially contained buckets. | ||
The timestamps are in milliseconds since the epoch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a more general thing: I much prefer using/passing around proper datetime objects instead of timestamps - they're just easier to use and operate on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I'll change
def majority_buckets(self) -> List[int]: | ||
"""Returns list of timestamps for each DatasetUsageStatistics bucket. | ||
|
||
Includes only buckets in the time window for which a majority of the bucket is ingested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to confirm: majority = half or more?
if usage_aspect.timestampMillis in buckets: | ||
usage_statistics_urns[usage_aspect.timestampMillis].add(urn) | ||
elif all_buckets: | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i doubt this ever gets hit, but if it does, it'll be a log line per aspect which will probably be super spammy
fieldCounts=[], | ||
), | ||
changeType=ChangeTypeClass.CREATE | ||
if all_buckets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so all_buckets
doesn't work, since we don't support create?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Normally don't like leaving dead code in but I would really like to get CREATE in and think it's the better way to implement this feature, so hopefully I'm able to get that in and switch this soon
Contains a lot more refactoring than I would like. Tried standardizing each usage source:
dataset_urn_builder
method to unify dataset urn creation logic between source and usage extractor. I don't like this too much, esp with tests, but think it's preferred to the old way which was just matched logic between the twoget_usage_workunits
and helper_get_internal_workunits
Checklist