Skip to content

Commit

Permalink
fix(ingestion/glue): add missing config options in profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jul 8, 2024
1 parent 54b9d98 commit 467b489
Show file tree
Hide file tree
Showing 6 changed files with 1,796 additions and 1,274 deletions.
118 changes: 69 additions & 49 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field as dataclass_field
from functools import lru_cache
from typing import (
Expand Down Expand Up @@ -158,8 +159,8 @@ class GlueSourceConfig(
default=False,
description="If an S3 Objects Tags should be created for the Tables ingested by Glue.",
)
profiling: Optional[GlueProfilingConfig] = Field(
default=None,
profiling: GlueProfilingConfig = Field(
default_factory=GlueProfilingConfig,
description="Configs to ingest data profiles from glue table",
)
# Custom Stateful Ingestion settings
Expand All @@ -172,7 +173,7 @@ class GlueSourceConfig(
)

def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

Expand Down Expand Up @@ -784,34 +785,39 @@ def _create_profile_mcp(
# instantiate column profile class for each column
column_profile = DatasetFieldProfileClass(fieldPath=column_name)

if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[self.source_config.profiling.mean]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[self.source_config.profiling.stdev]
if not self.source_config.profiling.profile_table_level_only:
if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[
self.source_config.profiling.mean
]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[
self.source_config.profiling.stdev
]

dataset_profile.fieldProfiles.append(column_profile)

Expand All @@ -831,9 +837,7 @@ def _create_profile_mcp(
def get_profile_if_enabled(
self, mce: MetadataChangeEventClass, database_name: str, table_name: str
) -> Iterable[MetadataWorkUnit]:
# We don't need both checks only the second one
# but then lint believes that GlueProfilingConfig can be None
if self.source_config.profiling and self.source_config.is_profiling_enabled():
if self.source_config.is_profiling_enabled():
# for cross-account ingestion
kwargs = dict(
DatabaseName=database_name,
Expand Down Expand Up @@ -861,21 +865,22 @@ def get_profile_if_enabled(
partitions = response["Partitions"]
partition_keys = [k["Name"] for k in partition_keys]

for p in partitions:
table_stats = p.get("Parameters", {})
column_stats = p["StorageDescriptor"]["Columns"]

# only support single partition key
partition_spec = str({partition_keys[0]: p["Values"][0]})

if self.source_config.profiling.partition_patterns.allowed(
partition_spec
):
yield self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
).as_workunit()
else:
continue
with ThreadPoolExecutor(
max_workers=self.source_config.profiling.max_workers
) as executor:
futures = [
executor.submit(
self._create_partition_profile_mcp, mce, partition_keys, p
)
for p in partitions
]
for future in as_completed(futures):
try:
result = future.result()
if result:
yield result
except Exception as e:
logger.error(f"Error profiling partition: {e}")
else:
# ingest data profile without partition
table_stats = response["Table"]["Parameters"]
Expand All @@ -884,6 +889,21 @@ def get_profile_if_enabled(
mce, table_stats, column_stats
).as_workunit()

def _create_partition_profile_mcp(
self,
mce: MetadataChangeEventClass,
partition_keys: List[str],
partition: Dict[str, Any],
) -> Optional[MetadataWorkUnit]:
table_stats = partition.get("Parameters", {})
column_stats = partition["StorageDescriptor"]["Columns"]
partition_spec = str({partition_keys[0]: partition["Values"][0]})
if self.source_config.profiling.partition_patterns.allowed(partition_spec):
return self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
).as_workunit()
return None

def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Optional

from pydantic.fields import Field
Expand All @@ -7,6 +8,22 @@


class GlueProfilingConfig(ConfigModel):
enabled: bool = Field(
default=False,
description="Whether profiling should be done.",
)
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only, or include column-level profiling as well.",
)
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)
row_count: Optional[str] = Field(
default=None,
description="The parameter name for row count in glue table.",
Expand Down
Loading

0 comments on commit 467b489

Please sign in to comment.