Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ingest/unity): Add Unity Catalog memory performance testing #8932

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ class Table(CommonProperty):
columns: List[Column]
storage_location: Optional[str]
data_source_format: Optional[DataSourceFormat]
comment: Optional[str]
table_type: TableType
owner: Optional[str]
generation: Optional[int]
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.utilities.perf_timer import PerfTimer
from tests.performance.bigquery import generate_events, ref_from_table
from tests.performance.bigquery.bigquery_events import generate_events, ref_from_table
from tests.performance.data_generation import (
NormalDistribution,
generate_data,
Expand All @@ -33,7 +33,7 @@ def run_test():
num_views=2000,
time_range=timedelta(days=7),
)
all_tables = seed_metadata.tables + seed_metadata.views
all_tables = seed_metadata.all_tables

config = BigQueryV2Config(
start_time=seed_metadata.start_time,
Expand Down
53 changes: 47 additions & 6 deletions metadata-ingestion/tests/performance/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, TypeVar
from typing import Iterable, List, TypeVar, Union, cast

from faker import Faker

from tests.performance.data_model import (
Column,
ColumnMapping,
ColumnType,
Container,
FieldAccess,
Query,
Expand Down Expand Up @@ -52,48 +55,74 @@ def sample_with_floor(self, floor: int = 1) -> int:

@dataclass
class SeedMetadata:
containers: List[Container]
# Each list is a layer of containers, e.g. [[databases], [schemas]]
containers: List[List[Container]]

tables: List[Table]
views: List[View]
start_time: datetime
end_time: datetime

@property
def all_tables(self) -> List[Table]:
return self.tables + cast(List[Table], self.views)


def generate_data(
num_containers: int,
num_containers: Union[List[int], int],
num_tables: int,
num_views: int,
columns_per_table: NormalDistribution = NormalDistribution(5, 2),
parents_per_view: NormalDistribution = NormalDistribution(2, 1),
view_definition_length: NormalDistribution = NormalDistribution(150, 50),
time_range: timedelta = timedelta(days=14),
) -> SeedMetadata:
containers = [Container(f"container-{i}") for i in range(num_containers)]
# Assemble containers
if isinstance(num_containers, int):
num_containers = [num_containers]

containers: List[List[Container]] = []
for i, num_in_layer in enumerate(num_containers):
layer = [
Container(
f"{i}-container-{j}",
parent=random.choice(containers[-1]) if containers else None,
)
for j in range(num_in_layer)
]
containers.append(layer)

# Assemble tables
tables = [
Table(
f"table-{i}",
container=random.choice(containers),
container=random.choice(containers[-1]),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
column_mapping=None,
)
for i in range(num_tables)
]
views = [
View(
f"view-{i}",
container=random.choice(containers),
container=random.choice(containers[-1]),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
column_mapping=None,
definition=f"{uuid.uuid4()}-{'*' * view_definition_length.sample_with_floor(10)}",
parents=random.sample(tables, parents_per_view.sample_with_floor()),
)
for i in range(num_views)
]

for table in tables + views:
_generate_column_mapping(table)

now = datetime.now(tz=timezone.utc)
return SeedMetadata(
containers=containers,
Expand Down Expand Up @@ -162,6 +191,18 @@ def generate_queries(
)


def _generate_column_mapping(table: Table) -> ColumnMapping:
d = {}
for column in table.columns:
d[column] = Column(
name=column,
type=random.choice(list(ColumnType)),
nullable=random.random() < 0.1, # Fixed 10% chance for now
)
table.column_mapping = d
return d


def _sample_list(lst: List[T], dist: NormalDistribution, floor: int = 1) -> List[T]:
return random.sample(lst, min(dist.sample_with_floor(floor), len(lst)))

Expand Down
31 changes: 27 additions & 4 deletions metadata-ingestion/tests/performance/data_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from enum import Enum
from typing import Dict, List, Optional

from typing_extensions import Literal

Column = str
StatementType = Literal[ # SELECT + values from OperationTypeClass
"SELECT",
"INSERT",
Expand All @@ -21,13 +21,36 @@
@dataclass
class Container:
name: str
parent: Optional["Container"] = None


class ColumnType(str, Enum):
# Can add types that take parameters in the future

INTEGER = "INTEGER"
FLOAT = "FLOAT" # Double precision (64 bit)
STRING = "STRING"
BOOLEAN = "BOOLEAN"
DATETIME = "DATETIME"


@dataclass
class Column:
name: str
type: ColumnType
nullable: bool


ColumnRef = str
ColumnMapping = Dict[ColumnRef, Column]


@dataclass
class Table:
name: str
container: Container
columns: List[Column]
columns: List[ColumnRef]
column_mapping: Optional[ColumnMapping]

def is_view(self) -> bool:
return False
Expand All @@ -44,7 +67,7 @@ def is_view(self) -> bool:

@dataclass
class FieldAccess:
column: Column
column: ColumnRef
table: Table


Expand Down
Empty file.
87 changes: 87 additions & 0 deletions metadata-ingestion/tests/performance/databricks/test_unity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
import os
from typing import Iterable, Tuple
from unittest.mock import patch

import humanfriendly
import psutil
from performance.databricks.unity_proxy_mock import UnityCatalogApiProxyMock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these imports should be tests.performance...


from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig
from datahub.ingestion.source.unity.source import UnityCatalogSource
from datahub.utilities.perf_timer import PerfTimer
from tests.performance.data_generation import (
NormalDistribution,
generate_data,
generate_queries,
)


def run_test():
seed_metadata = generate_data(
num_containers=[1, 100, 5000],
num_tables=50000,
num_views=10000,
columns_per_table=NormalDistribution(100, 50),
parents_per_view=NormalDistribution(5, 5),
view_definition_length=NormalDistribution(1000, 300),
)
queries = generate_queries(
seed_metadata,
num_selects=100000,
num_operations=100000,
num_unique_queries=10000,
num_users=1000,
)
proxy_mock = UnityCatalogApiProxyMock(
seed_metadata, queries=queries, num_service_principals=10000
)
print("Data generated")

config = UnityCatalogSourceConfig(
token="", workspace_url="http://localhost:1234", include_usage_statistics=False
)
ctx = PipelineContext(run_id="test")
with patch(
"datahub.ingestion.source.unity.source.UnityCatalogApiProxy",
lambda *args, **kwargs: proxy_mock,
):
source: UnityCatalogSource = UnityCatalogSource(ctx, config)

pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")

with PerfTimer() as timer:
workunits = source.get_workunits()
num_workunits, peak_memory_usage = workunit_sink(workunits)
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds")

print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
)
print(source.report.aspects)


def workunit_sink(workunits: Iterable[MetadataWorkUnit]) -> Tuple[int, int]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like copied code - any chance we could refactor some of the common logic out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

peak_memory_usage = psutil.Process(os.getpid()).memory_info().rss
i: int = 0
for i, wu in enumerate(workunits):
if i % 10_000 == 0:
peak_memory_usage = max(
peak_memory_usage, psutil.Process(os.getpid()).memory_info().rss
)
peak_memory_usage = max(
peak_memory_usage, psutil.Process(os.getpid()).memory_info().rss
)

return i, peak_memory_usage


if __name__ == "__main__":
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logging.StreamHandler())
run_test()
Loading
Loading