Skip to content

Commit

Permalink
fix(ingest/profiling): Filter tables early based on profile pattern f…
Browse files Browse the repository at this point in the history
…ilter (#10378)
  • Loading branch information
treff7es authored Apr 26, 2024
1 parent 4add9b1 commit 7e69247
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView
from datahub.ingestion.source.sql.sql_utils import check_table_with_profile_pattern
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile
from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionType
Expand All @@ -36,6 +37,10 @@ class DetailedProfilerReportMixin:
default_factory=int_top_k_dict
)

profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(
default_factory=int_top_k_dict
)

profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)

num_tables_not_eligible_profiling: Dict[str, int] = field(
Expand Down Expand Up @@ -272,8 +277,17 @@ def is_dataset_eligible_for_profiling(
threshold_time = datetime.now(timezone.utc) - timedelta(
self.config.profiling.profile_if_updated_since_days
)

schema_name = dataset_name.rsplit(".", 1)[0]

if not check_table_with_profile_pattern(
self.config.profile_pattern, dataset_name
):
self.report.profiling_skipped_table_profile_pattern[schema_name] += 1
logger.debug(
f"Table {dataset_name} is not allowed for profiling due to profile pattern"
)
return False

if (threshold_time is not None) and (
last_altered is not None and last_altered < threshold_time
):
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from typing import Dict, Iterable, List, Optional

from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -235,3 +236,27 @@ def schema_requires_v2(canonical_schema: List[SchemaField]) -> bool:
if ARRAY_TOKEN in field_name or UNION_TOKEN in field_name:
return True
return False


CHECK_TABLE_TABLE_PART_SEPARATOR_PATTERN = re.compile("\\\\?\\.")


def check_table_with_profile_pattern(
profile_pattern: AllowDenyPattern, table_name: str
) -> bool:
parts = len(table_name.split("."))
allow_list: List[str] = []

for pattern in profile_pattern.allow:
replaced_pattern = pattern.replace(".*", "").replace(".+", "")
splits = re.split(CHECK_TABLE_TABLE_PART_SEPARATOR_PATTERN, replaced_pattern)
if parts + 1 == len(splits):
table_pattern = pattern[: pattern.find(splits[-2]) + len(splits[-2])]
allow_list.append(table_pattern + "$")
else:
allow_list.append(pattern)

table_allow_deny_pattern = AllowDenyPattern(
allow=allow_list, deny=profile_pattern.deny
)
return table_allow_deny_pattern.allowed(table_name)
66 changes: 65 additions & 1 deletion metadata-ingestion/tests/unit/test_sql_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from datahub.ingestion.source.sql.sql_utils import gen_schema_key
import pytest

from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.sql.sql_utils import (
check_table_with_profile_pattern,
gen_schema_key,
)


def test_guid_generators():
Expand All @@ -13,3 +19,61 @@ def test_guid_generators():

guid = schema_key.guid()
assert guid == expected_guid


test_profile_pattern_matching_on_table_allow_list_test_data = [
("db.table.column", "db.table", True),
("db.table.column2", "db.table", True),
("db.table..*", "db.table", True),
("db.*", "db.table", True),
("db.*", "db.table", True),
("db.*", "db.schema.table", True),
("db.schema.*", "db.schema.table", True),
("db\\.schema\\..*", "db.schema.table", True),
("db\\.schema\\.table\\.column_prefix.*", "db.schema.table", True),
("db\\.schema\\.table\\.column", "db.schema.table", True),
("db\\.schema\\.table2\\.column", "db.schema.table", False),
("db2\\.schema.*", "db.schema.table", False),
("db2\\.schema.*", "db.schema.table", False),
("db\\.schema\\.table\\..*", "db.table2", False),
]


@pytest.mark.parametrize(
"allow_pattern, table_name, result",
test_profile_pattern_matching_on_table_allow_list_test_data,
)
def test_profile_pattern_matching_on_table_allow_list(
allow_pattern: str, table_name: str, result: bool
) -> None:
pattern = AllowDenyPattern(allow=[allow_pattern])
assert check_table_with_profile_pattern(pattern, table_name) == result


test_profile_pattern_matching_on_table_deny_list_test_data = [
("db.table.column", "db.table", True),
("db.table.column2", "db.table", True),
("db.table..*", "db.table", True),
("db.*", "db.table", False),
("db.*", "db.table", False),
("db.*", "db.schema.table", False),
("db.schema.*", "db.schema.table", False),
("db\\.schema\\..*", "db.schema.table", False),
("db\\.schema\\.table\\.column_prefix.*", "db.schema.table", True),
("db\\.schema\\.table\\.column", "db.schema.table", True),
("db\\.schema\\.table2\\.column", "db.schema.table", True),
("db2\\.schema.*", "db.schema.table", True),
("db2\\.schema.*", "db.schema.table", True),
("db\\.schema\\.table\\..*", "db.table2", True),
]


@pytest.mark.parametrize(
"deny_pattern, table_name, result",
test_profile_pattern_matching_on_table_deny_list_test_data,
)
def test_profile_pattern_matching_on_table_deny_list(
deny_pattern: str, table_name: str, result: bool
) -> None:
pattern = AllowDenyPattern(deny=[deny_pattern])
assert check_table_with_profile_pattern(pattern, table_name) == result

0 comments on commit 7e69247

Please sign in to comment.