Skip to content

Commit

Permalink
Fix Snowflake adapter warnings + add more debug logs (#107)
Browse files Browse the repository at this point in the history
* Fix pydantic dependency warning with snowflake-connector-python

* Fix pydantic warning according to using attr name that corresponds to parent class method

* Update snowflake tests according to client and adapter changes + pipes

* Linting

* Add debugging logs

* Modify properties to ttl cached methods due to avoid duplication of nesting logging

* Add invocation of Snowflake adapter tests in workflow

* Bump generic collector version

---------

Co-authored-by: Valerii Mironchenko <vmironchenko@provectus.com>
  • Loading branch information
ValeriyWorld and Valerii Mironchenko authored Sep 9, 2024
1 parent a349208 commit 90b447b
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 330 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ jobs:
unixodbc-dev openssl libsasl2-dev
poetry install
# for now, only tests for PostgreSQL adapter are being invoked, others need to be checked and updated for future use
# for now, only tests for PostgreSQL and Snowflake adapters are being invoked,
# others need to be checked and updated for future use
- name: Run tests
working-directory: odd-collector
run: poetry run pytest tests/integration/test_postgres.py -v
run: |
poetry run pytest tests/integration/test_postgres.py -v
poetry run pytest tests/adapters/snowflake -v
2 changes: 1 addition & 1 deletion odd-collector/odd_collector/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "0.1.64"
VERSION = "0.1.65"
42 changes: 23 additions & 19 deletions odd-collector/odd_collector/adapters/snowflake/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ def _fk_constraints(self) -> list[ForeignKeyConstraint]:
def _unique_constraints(self) -> list[UniqueConstraint]:
return self._get_metadata()["unique_constraints"]

@property
def _pipe_entities(self) -> list[tuple[Pipe, DataEntity]]:
@ttl_cache(ttl=CACHE_TTL)
def get_pipe_entities(self) -> list[tuple[Pipe, DataEntity]]:
pipes: list[Pipe] = []
for raw_pipe in self._raw_pipes:
pipes.extend(
Pipe(
catalog=raw_pipe.pipe_catalog,
schema=raw_pipe.pipe_schema,
schema_name=raw_pipe.pipe_schema,
name=raw_pipe.pipe_name,
definition=raw_pipe.definition,
stage_url=raw_stage.stage_url,
Expand All @@ -89,8 +89,8 @@ def _pipe_entities(self) -> list[tuple[Pipe, DataEntity]]:
)
return [(pipe, map_pipe(pipe, self.generator)) for pipe in pipes]

@property
def _table_entities(self) -> list[tuple[Table, DataEntity]]:
@ttl_cache(ttl=CACHE_TTL)
def get_table_entities(self) -> list[tuple[Table, DataEntity]]:
result = []

for table in self._tables:
Expand All @@ -100,32 +100,36 @@ def _table_entities(self) -> list[tuple[Table, DataEntity]]:
result.append((table, map_table(table, self.generator)))
return result

@property
def _relationship_entities(self) -> list[DataEntity]:
@ttl_cache(ttl=CACHE_TTL)
def get_relationship_entities(self) -> list[DataEntity]:
return DataEntityRelationshipsMapper(
oddrn_generator=self.generator,
unique_constraints=self._unique_constraints,
table_entities_pair=self._table_entities,
table_entities_pair=self.get_table_entities(),
).map(self._fk_constraints)

@property
def _schema_entities(self) -> list[DataEntity]:
return map_schemas(self._table_entities, self._pipe_entities, self.generator)
@ttl_cache(ttl=CACHE_TTL)
def get_schema_entities(self) -> list[DataEntity]:
return map_schemas(
self.get_table_entities(), self.get_pipe_entities(), self.generator
)

@property
def _database_entity(self) -> DataEntity:
return map_database(self._database_name, self._schema_entities, self.generator)
@ttl_cache(ttl=CACHE_TTL)
def get_database_entity(self) -> DataEntity:
return map_database(
self._database_name, self.get_schema_entities(), self.generator
)

def get_data_entity_list(self) -> DataEntityList:
try:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[
*[te[1] for te in self._table_entities],
*self._schema_entities,
self._database_entity,
*[pe[1] for pe in self._pipe_entities],
*self._relationship_entities,
*[te[1] for te in self.get_table_entities()],
*self.get_schema_entities(),
self.get_database_entity(),
*[pe[1] for pe in self.get_pipe_entities()],
*self.get_relationship_entities(),
],
)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def stage_full_name(self) -> str:

class Pipe(Connectable):
catalog: str
schema: str
schema_name: str
name: str
definition: str
stage_url: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List

from odd_collector.adapters.snowflake.domain import Column
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataSetField, DataSetFieldType, Type
from oddrn_generator import SnowflakeGenerator

Expand Down Expand Up @@ -52,6 +53,10 @@ def map_columns(
result: List[DataSetField] = []

for column in columns:
logger.debug(
f"Mapping column from table {column.table_schema}.{column.table_name}: {column.column_name}"
)

column_oddrn_key = f"{parent_path.value}_columns"
generator_params = {column_oddrn_key: column.column_name}
generator.set_oddrn_paths(**generator_params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List

from funcy import lpluck_attr
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import SnowflakeGenerator

Expand All @@ -11,6 +12,8 @@ def map_database(
schemas_entities: List[DataEntity],
generator: SnowflakeGenerator,
) -> DataEntity:
logger.debug(f"Mapping database: {database_name}")

generator = deepcopy(generator)
oddrn = generator.get_oddrn_by_path("databases", database_name)
return DataEntity(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import logging
from abc import abstractmethod
from copy import deepcopy
from typing import List

import sqlparse
from odd_collector.adapters.snowflake.domain import Pipe
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityType, DataTransformer
from oddrn_generator import SnowflakeGenerator
from oddrn_generator.generators import S3Generator

from .view import _map_connection

logger = logging.getLogger("Snowpipe")


def map_pipe(pipe: Pipe, generator: SnowflakeGenerator) -> DataEntity:
full_pipe_name = f"{pipe.schema_name}.{pipe.name}"
logger.debug(f"Mapping pipe: {full_pipe_name}")

generator = deepcopy(generator)
generator.set_oddrn_paths(schemas=pipe.schema, pipes=pipe.name)
generator.set_oddrn_paths(schemas=pipe.schema_name, pipes=pipe.name)

return DataEntity(
oddrn=generator.get_oddrn_by_path("pipes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Table,
UniqueConstraint,
)
from odd_collector.adapters.snowflake.logger import logger
from odd_collector.adapters.snowflake.mappers.relationships.relationship_mapper import (
RelationshipMapper,
)
Expand Down Expand Up @@ -35,6 +36,11 @@ def _map_data_entity_relationship(
referenced_schema_name = fk_cons.referenced_schema_name
referenced_table_name = fk_cons.referenced_table_name

logger.debug(
f"Mapping relationship referencing to the table {referenced_schema_name}.{referenced_table_name}: "
f"{fk_cons.constraint_name}"
)

self.oddrn_generator.set_oddrn_paths(
schemas=schema_name,
tables=table_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
from copy import deepcopy

from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import SnowflakeGenerator

Expand All @@ -20,11 +21,13 @@ def map_schemas(
grouped[table.table_catalog][table.table_schema].add(entity.oddrn)

for pipe, entity in pipe_entities:
grouped[pipe.catalog][pipe.schema].add(entity.oddrn)
grouped[pipe.catalog][pipe.schema_name].add(entity.oddrn)

entities = []
for catalog, schemas in grouped.items():
for schema, oddrns in schemas.items():
logger.debug(f"Mapping schema: {schema}")

generator.set_oddrn_paths(databases=catalog, schemas=schema)
oddrn = generator.get_oddrn_by_path("schemas")
entity = DataEntity(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from copy import deepcopy

from odd_collector.adapters.snowflake.domain import Table
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import SnowflakeGenerator

Expand All @@ -11,6 +12,9 @@


def map_table(table: Table, generator: SnowflakeGenerator) -> DataEntity:
full_table_name = f"{table.table_schema}.{table.table_name}"
logger.debug(f"Mapping table: {full_table_name}")

generator = deepcopy(generator)
generator.set_oddrn_paths(schemas=table.table_schema, tables=table.table_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@


def map_view(view: View, generator: SnowflakeGenerator) -> DataEntity:
full_view_name = f"{view.table_schema}.{view.table_name}"
logger.debug(f"Mapping view: {full_view_name}")

generator = deepcopy(generator)
generator.set_oddrn_paths(schemas=view.table_schema, views=view.table_name)

sql = None
try:
sql = sqlparse.format(view.view_definition)
except Exception:
logger.warning(f"Couldn't parse view definition {view.view_definition}")
logger.warning(
f"Couldn't parse {full_view_name} view definition: {view.view_definition}"
)

return DataEntity(
oddrn=generator.get_oddrn_by_path("views"),
Expand Down
Loading

0 comments on commit 90b447b

Please sign in to comment.