Skip to content

Commit

Permalink
feat(ingest): add snowflake-summary source (#10642)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and yoonhyejin committed Jul 16, 2024
1 parent 88496af commit d197ccc
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 26 deletions.
89 changes: 65 additions & 24 deletions metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,12 @@ def generate(
if source and source != plugin_name:
continue

if plugin_name in {
"snowflake-summary",
}:
logger.info(f"Skipping {plugin_name} as it is on the deny list")
continue

metrics["plugins"]["discovered"] = metrics["plugins"]["discovered"] + 1 # type: ignore
# We want to attempt to load all plugins before printing a summary.
source_type = None
Expand Down Expand Up @@ -885,11 +891,14 @@ def generate(
os.makedirs(source_dir, exist_ok=True)
doc_file = f"{source_dir}/lineage-feature-guide.md"
with open(doc_file, "w+") as f:
f.write("import FeatureAvailability from '@site/src/components/FeatureAvailability';\n\n")
f.write(
"import FeatureAvailability from '@site/src/components/FeatureAvailability';\n\n"
)
f.write(f"# About DataHub Lineage\n\n")
f.write("<FeatureAvailability/>\n")

f.write("""
f.write(
"""
Data lineage is a **map that shows how data flows through your organization.** It details where your data originates, how it travels, and where it ultimately ends up.
This can happen within a single system (like data moving between Snowflake tables) or across various platforms.
Expand Down Expand Up @@ -979,58 +988,88 @@ def generate(
### Automatic Lineage Extraction Support
This is a summary of automatic lineage extraciton support in our data source. Please refer to the **Important Capabilities** table in the source documentation. Note that even if the source does not support automatic extraction, you can still add lineage manually using our API & SDKs.\n""")
This is a summary of automatic lineage extraciton support in our data source. Please refer to the **Important Capabilities** table in the source documentation. Note that even if the source does not support automatic extraction, you can still add lineage manually using our API & SDKs.\n"""
)

f.write("\n| Source | Table-Level Lineage | Column-Level Lineage | Related Configs |\n")
f.write(
"\n| Source | Table-Level Lineage | Column-Level Lineage | Related Configs |\n"
)
f.write("| ---------- | ------ | ----- |----- |\n")

for platform_id, platform_docs in sorted(
source_documentation.items(),
key=lambda x: (x[1]["name"].casefold(), x[1]["name"])
if "name" in x[1]
else (x[0].casefold(), x[0]),
source_documentation.items(),
key=lambda x: (x[1]["name"].casefold(), x[1]["name"])
if "name" in x[1]
else (x[0].casefold(), x[0]),
):
for plugin, plugin_docs in sorted(
platform_docs["plugins"].items(),
key=lambda x: str(x[1].get("doc_order"))
if x[1].get("doc_order")
else x[0],
platform_docs["plugins"].items(),
key=lambda x: str(x[1].get("doc_order"))
if x[1].get("doc_order")
else x[0],
):
platform_name = platform_docs['name']
platform_name = platform_docs["name"]
if len(platform_docs["plugins"].keys()) > 1:
# We only need to show this if there are multiple modules.
platform_name = f"{platform_name} `{plugin}`"

# Initialize variables
table_level_supported = "❌"
column_level_supported = "❌"
config_names = ''
config_names = ""

if "capabilities" in plugin_docs:
plugin_capabilities = plugin_docs["capabilities"]

for cap_setting in plugin_capabilities:
capability_text = get_capability_text(cap_setting.capability)
capability_supported = get_capability_supported_badge(cap_setting.supported)
capability_supported = get_capability_supported_badge(
cap_setting.supported
)

if capability_text == "Table-Level Lineage" and capability_supported == "✅":
if (
capability_text == "Table-Level Lineage"
and capability_supported == "✅"
):
table_level_supported = "✅"

if capability_text == "Column-level Lineage" and capability_supported == "✅":
if (
capability_text == "Column-level Lineage"
and capability_supported == "✅"
):
column_level_supported = "✅"

if not (table_level_supported == "❌" and column_level_supported == "❌"):
if "config_schema" in plugin_docs:
config_properties = json.loads(plugin_docs['config_schema']).get('properties', {})
config_names = '<br />'.join(
[f'- {property_name}' for property_name in config_properties if 'lineage' in property_name])
lineage_not_applicable_sources = ['azure-ad', 'csv', 'demo-data', 'dynamodb', 'iceberg', 'json-schema', 'ldap', 'openapi', 'pulsar', 'sqlalchemy' ]
if platform_id not in lineage_not_applicable_sources :
config_properties = json.loads(
plugin_docs["config_schema"]
).get("properties", {})
config_names = "<br />".join(
[
f"- {property_name}"
for property_name in config_properties
if "lineage" in property_name
]
)
lineage_not_applicable_sources = [
"azure-ad",
"csv",
"demo-data",
"dynamodb",
"iceberg",
"json-schema",
"ldap",
"openapi",
"pulsar",
"sqlalchemy",
]
if platform_id not in lineage_not_applicable_sources:
f.write(
f"| [{platform_name}](../../generated/ingestion/sources/{platform_id}.md) | {table_level_supported} | {column_level_supported} | {config_names}|\n"
)

f.write("""
f.write(
"""
### SQL Parser Lineage Extraction
Expand All @@ -1054,10 +1093,12 @@ def generate(
- [Data in Context: Lineage Explorer in DataHub](https://blog.datahubproject.io/data-in-context-lineage-explorer-in-datahub-a53a9a476dc4)
- [Harnessing the Power of Data Lineage with DataHub](https://blog.datahubproject.io/harnessing-the-power-of-data-lineage-with-datahub-ad086358dec4)
- [Data Lineage: What It Is And Why It Matters](https://blog.datahubproject.io/data-lineage-what-it-is-and-why-it-matters-1a8d9846f0bd)
""")
"""
)

print("Lineage Documentation Generation Complete")


if __name__ == "__main__":
logger.setLevel("INFO")
generate()
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@
"redshift = datahub.ingestion.source.redshift.redshift:RedshiftSource",
"slack = datahub.ingestion.source.slack.slack:SlackSource",
"snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source",
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import dataclasses
import logging
from collections import defaultdict
from typing import Dict, Iterable, List, Optional

import pydantic
from snowflake.connector import SnowflakeConnection

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import LowerCaseDatasetUrnConfigMixin
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import SupportStatus, config_class, support_status
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_schema import (
SnowflakeDatabase,
SnowflakeDataDictionary,
)
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeConnectionMixin,
SnowflakeQueryMixin,
)
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.utilities.lossy_collections import LossyList


class SnowflakeSummaryConfig(
BaseSnowflakeConfig, BaseTimeWindowConfig, LowerCaseDatasetUrnConfigMixin
):

# Copied from SnowflakeConfig.
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
schema_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
)
table_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
view_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
match_fully_qualified_names: bool = pydantic.Field(
default=True,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)


@dataclasses.dataclass
class SnowflakeSummaryReport(SourceReport, BaseTimeWindowReport):
filtered: LossyList[str] = dataclasses.field(default_factory=LossyList)

num_get_tables_for_schema_queries: int = 0
num_get_views_for_schema_queries: int = 0

schema_counters: Dict[str, int] = dataclasses.field(default_factory=dict)
object_counters: Dict[str, Dict[str, int]] = dataclasses.field(
default_factory=lambda: defaultdict(lambda: defaultdict(int))
)

num_snowflake_queries: Optional[int] = None
num_snowflake_mutations: Optional[int] = None

def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name)

def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
pass


@config_class(SnowflakeSummaryConfig)
@support_status(SupportStatus.INCUBATING)
class SnowflakeSummarySource(
SnowflakeQueryMixin,
SnowflakeConnectionMixin,
SnowflakeCommonMixin,
Source,
):
def __init__(self, ctx: PipelineContext, config: SnowflakeSummaryConfig):
super().__init__(ctx)
self.config: SnowflakeSummaryConfig = config
self.report: SnowflakeSummaryReport = SnowflakeSummaryReport()

self.data_dictionary = SnowflakeDataDictionary()
self.connection: Optional[SnowflakeConnection] = None
self.logger = logging.getLogger(__name__)

def create_connection(self) -> Optional[SnowflakeConnection]:
# TODO: Eventually we'll want to use the implementation from SnowflakeConnectionMixin,
# since it has better error reporting.
# return super().create_connection()
return self.config.get_connection()

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.connection = self.create_connection()
if self.connection is None:
return

self.data_dictionary.set_connection(self.connection)

# Databases.
databases: List[SnowflakeDatabase] = []
for database in self.get_databases() or []: # type: ignore
# TODO: Support database_patterns.
if not self.config.database_pattern.allowed(database.name):
self.report.report_dropped(f"{database.name}.*")
else:
databases.append(database)

# Schemas.
for database in databases:
self.fetch_schemas_for_database(database, database.name) # type: ignore

self.report.schema_counters[database.name] = len(database.schemas)

for schema in database.schemas:
# Tables/views.
tables = self.fetch_tables_for_schema( # type: ignore
schema, database.name, schema.name
)
views = self.fetch_views_for_schema( # type: ignore
schema, database.name, schema.name
)

self.report.object_counters[database.name][schema.name] = len(
tables
) + len(views)

# Queries for usage.
start_time_millis = self.config.start_time.timestamp() * 1000
end_time_millis = self.config.end_time.timestamp() * 1000
for row in self.query(
f"""\
SELECT COUNT(*) AS CNT
FROM snowflake.account_usage.query_history
WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3)
"""
):
self.report.num_snowflake_queries = row["CNT"]

# Queries for lineage/operations.
for row in self.query(
f"""\
SELECT COUNT(*) AS CNT
FROM
snowflake.account_usage.access_history access_history
WHERE query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND access_history.objects_modified is not null
AND ARRAY_SIZE(access_history.objects_modified) > 0
"""
):
self.report.num_snowflake_mutations = row["CNT"]

# This source doesn't produce any metadata itself. All important information goes into the report.
yield from []

# This is a bit of a hack, but lets us reuse the code from the main ingestion source.
# Mypy doesn't really know how to deal with it though, which is why we have all these
# type ignore comments.
get_databases = SnowflakeV2Source.get_databases
get_databases_from_ischema = SnowflakeV2Source.get_databases_from_ischema
fetch_schemas_for_database = SnowflakeV2Source.fetch_schemas_for_database
fetch_tables_for_schema = SnowflakeV2Source.fetch_tables_for_schema
fetch_views_for_schema = SnowflakeV2Source.fetch_views_for_schema
get_tables_for_schema = SnowflakeV2Source.get_tables_for_schema
get_views_for_schema = SnowflakeV2Source.get_views_for_schema

def get_report(self) -> SnowflakeSummaryReport:
return self.report
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_connection(self) -> SnowflakeConnection:
class SnowflakeQueryMixin:
def query(self: SnowflakeQueryProtocol, query: str) -> Any:
try:
self.logger.debug(f"Query : {query}")
self.logger.debug(f"Query : {query}", stacklevel=2)
resp = self.get_connection().cursor(DictCursor).execute(query)
return resp

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class SnowflakeUsageConfig(BaseUsageConfig):
email_domain: Optional[str] = pydantic.Field(
default=None,
description="Email domain of your organisation so users can be displayed on UI appropriately.",
description="Email domain of your organization so users can be displayed on UI appropriately.",
)
apply_view_usage_to_tables: bool = pydantic.Field(
default=False,
Expand Down

0 comments on commit d197ccc

Please sign in to comment.