Skip to content

Commit

Permalink
fix: Resolve PR Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jul 29, 2024
1 parent cad9e8e commit 5646c18
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 54 deletions.
47 changes: 15 additions & 32 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field as dataclass_field
from functools import lru_cache
from typing import (
Expand Down Expand Up @@ -948,22 +947,21 @@ def get_profile_if_enabled(
partitions = response["Partitions"]
partition_keys = [k["Name"] for k in partition_keys]

with ThreadPoolExecutor(
max_workers=self.source_config.profiling.max_workers
) as executor:
futures = [
executor.submit(
self._create_partition_profile_mcp, mce, partition_keys, p
)
for p in partitions
]
for future in as_completed(futures):
try:
result = future.result()
if result:
yield result
except Exception as e:
logger.error(f"Error profiling partition: {e}")
for p in partitions:
table_stats = p.get("Parameters", {})
column_stats = p["StorageDescriptor"]["Columns"]

# only support single partition key
partition_spec = str({partition_keys[0]: p["Values"][0]})

if self.source_config.profiling.partition_patterns.allowed(
partition_spec
):
yield self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
).as_workunit()
else:
continue
else:
# ingest data profile without partition
table_stats = response["Table"]["Parameters"]
Expand All @@ -972,21 +970,6 @@ def get_profile_if_enabled(
mce, table_stats, column_stats
).as_workunit()

def _create_partition_profile_mcp(
self,
mce: MetadataChangeEventClass,
partition_keys: List[str],
partition: Dict[str, Any],
) -> Optional[MetadataWorkUnit]:
table_stats = partition.get("Parameters", {})
column_stats = partition["StorageDescriptor"]["Columns"]
partition_spec = str({partition_keys[0]: partition["Values"][0]})
if self.source_config.profiling.partition_patterns.allowed(partition_spec):
return self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
).as_workunit()
return None

def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from typing import Optional

from pydantic.fields import Field
Expand All @@ -16,14 +15,6 @@ class GlueProfilingConfig(ConfigModel):
default=False,
description="Whether to perform profiling at table-level only, or include column-level profiling as well.",
)
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)
row_count: Optional[str] = Field(
default=None,
description="The parameter name for row count in glue table.",
Expand Down
16 changes: 3 additions & 13 deletions metadata-ingestion/tests/unit/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,24 +686,14 @@ def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass:
)


@pytest.mark.parametrize(
"platform_instance, mce_file, mce_golden_file",
[
(None, "glue_mces.json", "glue_mces_golden_profiling.json"),
],
)
@freeze_time(FROZEN_TIME)
def test_glue_ingest_with_profiling(
tmp_path: Path,
pytestconfig: PytestConfig,
platform_instance: str,
mce_file: str,
mce_golden_file: str,
) -> None:
glue_source_instance = glue_source_with_profiling(
platform_instance=platform_instance
)

glue_source_instance = glue_source_with_profiling()
mce_file = "glue_mces.json"
mce_golden_file = "glue_mces_golden_profiling.json"
with Stubber(glue_source_instance.glue_client) as glue_stubber:
glue_stubber.add_response("get_databases", get_databases_response_profiling, {})

Expand Down

0 comments on commit 5646c18

Please sign in to comment.