Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/unity): Add usage extraction; add TableReference #7910

Merged
merged 5 commits into from
May 1, 2023

Conversation

asikowitz
Copy link
Collaborator

@asikowitz asikowitz commented Apr 26, 2023

Adds unity catalog usage extraction

Problems encountered:

  • Parses sql as unity API sql history endpoint has no mention of referenced or destination tables
  • Attempts to guess catalog / schema if they are not provided, as API does not specify current catalog or schema when queries were run, nor provides fully qualified table names

Uses sqllineage parser and a spark sql parser as fallback.

Refactors:

  • Creates UsageAggregator class to usage_common, as I've seen this same logic multiple times.
  • Allows customizable user_urn_builder in usage_common as not all unity users are emails. We create emails with a default email_domain config in other connectors like redshift and snowflake, which seems unnecessary now?
  • Creates TableReference for unity catalog and adds it to the Table dataclass, for managing string references to tables. Replaces logic, especially in lineage extraction, with these references
  • Creates gen_dataset_urn and gen_user_urn on unity source to reduce duplicate code
  • Breaks up proxy.py into implementation and types

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Apr 26, 2023
def spark_sql_parser(self):
"""Lazily initializes the Spark SQL parser."""
if self._spark_sql_parser is None:
spark_context = pyspark.SparkContext.getOrCreate()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we install pyspark dependency for unity-catalog ? Its not explicitly mentioned in setup.py. Is it indirectly installed for databricks-cli ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, it's part of delta lake but not databricks



class UnityCatalogSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class UnityCatalogSourceConfig(
StatefulIngestionConfigBase, BaseUsageConfig, DatasetSourceConfigMixin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseUsageConfig has a lot of fields - if not all of them are supported, then we should change the base class that we're using

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use most of them, as they get used by usage_common. The only one that doesn't get used is include_read_operational_stats which seems to only be used by bigquery... so if anything I say we remove it from this common config and put it only in the bigquery one.

)
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup



TableMap = Dict[str, List[TableReference]]
T = TypeVar("T", bound=object)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think the bound is doing anything here

)

if not self.config.table_pattern.allowed(filter_table_name):
if not self.config.table_pattern.allowed(table.ref.qualified_table_name):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the filter_table_name isn't quite the fully qualified table name, since it doesn't include the metastore name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so I made TableReference.qualified_table_name not include metastore name, while TableReference.__str__ includes metastore name. I'm realizing that naming can be confusing though. Any ideas on a better one?


def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]:
try:
runner = LineageRunner(query)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a huge fan of the fact that this LineageRunner stuff is copy-pasted across our codebase

Ideally we'd go through a unified parser interface, where you can select the underlying parser(s) to use with an enum or something

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to fix it here though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a our SqlLineageSQLParser which creates SqlLineageSQLParserImpl which eventually calls LineageAnalyzer().analyze, but there was a lot of logic in there which didn't seem necessary. I thought overall this was simpler. Agree that we should standardize this as much as possible, but I think it'll take a bit of work

spark_context = pyspark.SparkContext.getOrCreate()
spark_session = pyspark.sql.SparkSession(spark_context)
self._spark_sql_parser = (
spark_session._jsparkSession.sessionState().sqlParser()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoa that's cool

@asikowitz asikowitz merged commit 5b290c9 into datahub-project:master May 1, 2023
@asikowitz asikowitz deleted the unity-catalog-usage branch May 1, 2023 18:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants