Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/unity): Add usage extraction; add TableReference #7910

Merged
merged 5 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ def _generate_usage_workunits(
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
urn_builder=lambda resource: resource.to_urn(self.config.env),
resource_urn_builder=lambda resource: resource.to_urn(
self.config.env
),
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
)
Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig


class UnityCatalogSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class UnityCatalogSourceConfig(
StatefulIngestionConfigBase, BaseUsageConfig, DatasetSourceConfigMixin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseUsageConfig has a lot of fields - if not all of them are supported, then we should change the base class that we're using

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use most of them, as they get used by usage_common. The only one that doesn't get used is include_read_operational_stats which seems to only be used by bigquery... so if anything I say we remove it from this common config and put it only in the bigquery one.

):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(description="Databricks workspace url")
workspace_name: Optional[str] = pydantic.Field(
Expand Down Expand Up @@ -65,6 +68,11 @@ class UnityCatalogSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigM
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)

include_usage_statistics: bool = Field(
default=True,
description="Generate usage statistics.",
)

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Unity Catalog Stateful Ingestion Config."
)
217 changes: 90 additions & 127 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,32 @@
"""
Manage the communication with DataBricks Server and provide equivalent dataclasses for dependent modules
"""
import datetime
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional

from databricks_cli.sdk.api_client import ApiClient
from databricks_cli.unity_catalog.api import UnityCatalogApi

from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.metadata.schema_classes import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
MapTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaFieldDataTypeClass,
StringTypeClass,
TimeTypeClass,
from datahub.ingestion.source.unity.proxy_types import (
ALLOWED_STATEMENT_TYPES,
DATA_TYPE_REGISTRY,
Catalog,
Column,
Metastore,
Query,
QueryStatus,
Schema,
ServicePrincipal,
StatementType,
Table,
TableReference,
)
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.metadata.schema_classes import SchemaFieldDataTypeClass

logger: logging.Logger = logging.getLogger(__name__)

# Supported types are available at
# https://api-docs.databricks.com/rest/latest/unity-catalog-api-specification-2-1.html?_ga=2.151019001.1795147704.1666247755-2119235717.1666247755

DATA_TYPE_REGISTRY: dict = {
"BOOLEAN": BooleanTypeClass,
"BYTE": BytesTypeClass,
"DATE": DateTypeClass,
"SHORT": NumberTypeClass,
"INT": NumberTypeClass,
"LONG": NumberTypeClass,
"FLOAT": NumberTypeClass,
"DOUBLE": NumberTypeClass,
"TIMESTAMP": TimeTypeClass,
"STRING": StringTypeClass,
"BINARY": BytesTypeClass,
"DECIMAL": NumberTypeClass,
"INTERVAL": TimeTypeClass,
"ARRAY": ArrayTypeClass,
"STRUCT": RecordTypeClass,
"MAP": MapTypeClass,
"CHAR": StringTypeClass,
"NULL": NullTypeClass,
}


@dataclass
class CommonProperty:
id: str
name: str
type: str
comment: Optional[str]


@dataclass
class Metastore(CommonProperty):
metastore_id: str
owner: Optional[str]


@dataclass
class Catalog(CommonProperty):
metastore: Metastore
owner: Optional[str]


@dataclass
class Schema(CommonProperty):
catalog: Catalog
owner: Optional[str]


@dataclass
class Column(CommonProperty):
type_text: str
type_name: SchemaFieldDataTypeClass
type_precision: int
type_scale: int
position: int
nullable: bool
comment: Optional[str]


@dataclass
class ColumnLineage:
source: str
destination: str


@dataclass
class ServicePrincipal:
id: str
application_id: str # uuid used to reference the service principal
display_name: str
active: Optional[bool]


@dataclass
class Table(CommonProperty):
schema: Schema
columns: List[Column]
storage_location: Optional[str]
data_source_format: Optional[str]
comment: Optional[str]
table_type: str
owner: Optional[str]
generation: int
created_at: datetime.datetime
created_by: str
updated_at: Optional[datetime.datetime]
updated_by: Optional[str]
table_id: str
view_definition: Optional[str]
properties: Dict[str, str]
upstreams: Dict[str, Dict[str, List[str]]] = field(default_factory=dict)

# lineage: Optional[Lineage]


class UnityCatalogApiProxy:
_unity_catalog_api: UnityCatalogApi
Expand Down Expand Up @@ -197,7 +101,9 @@ def tables(self, schema: Schema) -> Iterable[Table]:
)

if response.get("tables") is None:
logger.info(f"Tables not found for schema {schema.name}")
logger.info(
f"Tables not found for schema {schema.catalog.name}.{schema.name}"
)
return []

for table in response["tables"]:
Expand All @@ -217,6 +123,38 @@ def service_principals(self) -> Iterable[ServicePrincipal]:
for principal in response["Resources"]:
yield self._create_service_principal(principal)

def query_history(
self,
start_time: datetime,
end_time: datetime,
) -> Iterable[Query]:
# This is a _complete_ hack. The source code of perform_query
# bundles data into query params if method == "GET", but we need it passed as the body.
# To get around this, we set method to underscore "get".
# I still prefer this over duplicating the code in perform_query.
method = "get"
path = "/sql/history/queries"
data: Dict[str, Any] = {
"include_metrics": False,
"max_results": 1000, # Max batch size
}
filter_by = {
"query_start_time_range": {
"start_time_ms": start_time.timestamp() * 1000,
"end_time_ms": end_time.timestamp() * 1000,
},
"statuses": [QueryStatus.FINISHED.value],
"statement_types": list(ALLOWED_STATEMENT_TYPES),
}
response: dict = self._unity_catalog_api.client.client.perform_query(
method, path, {**data, "filter_by": filter_by}
)
yield from self._create_queries(response["res"])
while response["has_next_page"]:
response = self._unity_catalog_api.client.client.perform_query(
method, path, {**data, "next_page_token": response["next_page_token"]}
)

def list_lineages_by_table(self, table_name=None, headers=None):
"""
List table lineage by table name
Expand Down Expand Up @@ -259,7 +197,12 @@ def table_lineage(self, table: Table) -> None:
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}"
)
table.upstreams = {
f"{item['catalog_name']}.{item['schema_name']}.{item['name']}": {}
TableReference(
table.schema.catalog.metastore.id,
item["catalog_name"],
item["schema_name"],
item["name"],
): {}
for item in response.get("upstream_tables", [])
}
except Exception as e:
Expand All @@ -277,17 +220,15 @@ def get_column_lineage(self, table: Table) -> None:
column_name=column.name,
)
for item in response.get("upstream_cols", []):
table_name = f"{item['catalog_name']}.{item['schema_name']}.{item['table_name']}"
col_name = item["name"]
if not table.upstreams.get(table_name):
table.upstreams[table_name] = {column.name: [col_name]}
else:
if column.name in table.upstreams[table_name]:
table.upstreams[table_name][column.name].append(
col_name
)
else:
table.upstreams[table_name][column.name] = [col_name]
table_ref = TableReference(
table.schema.catalog.metastore.id,
item["catalog_name"],
item["schema_name"],
item["table_name"],
)
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup


except Exception as e:
logger.error(f"Error getting lineage: {e}")
Expand Down Expand Up @@ -366,9 +307,9 @@ def _create_table(self, schema: Schema, obj: Any) -> Table:
properties=obj.get("properties", {}),
owner=obj.get("owner"),
generation=obj["generation"],
created_at=datetime.datetime.utcfromtimestamp(obj["created_at"] / 1000),
created_at=datetime.utcfromtimestamp(obj["created_at"] / 1000),
created_by=obj["created_by"],
updated_at=datetime.datetime.utcfromtimestamp(obj["updated_at"] / 1000)
updated_at=datetime.utcfromtimestamp(obj["updated_at"] / 1000)
if "updated_at" in obj
else None,
updated_by=obj.get("updated_by", None),
Expand All @@ -384,3 +325,25 @@ def _create_service_principal(self, obj: dict) -> ServicePrincipal:
application_id=obj["applicationId"],
active=obj.get("active"),
)

def _create_queries(self, lst: List[dict]) -> Iterable[Query]:
for obj in lst:
try:
yield self._create_query(obj)
except Exception as e:
logger.warning(f"Error parsing query: {e}")
self.report.report_warning("query-parse", str(e))

@staticmethod
def _create_query(obj: dict) -> Query:
return Query(
query_id=obj["query_id"],
query_text=obj["query_text"],
statement_type=StatementType(obj["statement_type"]),
start_time=datetime.utcfromtimestamp(obj["query_start_time_ms"] / 1000),
end_time=datetime.utcfromtimestamp(obj["query_end_time_ms"] / 1000),
user_id=obj["user_id"],
user_name=obj["user_name"],
executed_as_user_id=obj["executed_as_user_id"],
executed_as_user_name=obj["executed_as_user_name"],
)
Loading