Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service upgrades #44

Merged
merged 7 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions databuilder/databuilder/models/description_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ def create_description_metadata(text: Union[None, str],
return description_node

def get_description_id(self) -> str:
return self.get_id(self.source)
return DescriptionMetadata.get_id(self.source)

def get_id(self, source: str) -> str:
def get_id(source: str) -> str:
if source == DescriptionMetadata.DEFAULT_SOURCE:
return DescriptionMetadata.DEFAULT_DESCRIPTION_ID
else:
Expand Down
14 changes: 12 additions & 2 deletions databuilder/databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,23 @@ def _try_create_index(self, label: str) -> None:
:return:
"""
stmt = Template("""
CREATE CONSTRAINT ON (node:{{ LABEL }}) ASSERT node.key IS UNIQUE
CREATE CONSTRAINT FOR (node:{{ LABEL }}) REQUIRE node.key IS UNIQUE
""").render(LABEL=label)

LOGGER.info(f'Trying to create index for label {label} if not exist: {stmt}')
with self._driver.session(database=self._db_name) as session:
try:
session.run(stmt)
result = session.run(f"""
SHOW CONSTRAINTS
YIELD name, type, entityType, labelsOrTypes, properties
WHERE type = 'UNIQUENESS' AND entityType = 'NODE' AND labelsOrTypes = ['{label}'] AND properties = ['key']
RETURN count(*) AS constraintExists
""")
constraint_exists = result.single()["constraintExists"]

# Step 2: Conditionally create the constraint
if constraint_exists == 0:
session.run(stmt)
except Neo4jError as e:
if 'An equivalent constraint already exists' not in e.__str__():
raise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def _try_create_index(self, label: str) -> None:
"""
# TODO: apoc.schema.assert may help here for handling neo4j 3.x and 4.x
stmt = """
CREATE CONSTRAINT ON (node:label) ASSERT node.key IS UNIQUE
CREATE CONSTRAINT FOR (node:label) REQUIRE node.key IS UNIQUE
"""

LOGGER.info(f'Trying to create index for label {label} if not exist: {stmt}')
Expand Down
21 changes: 16 additions & 5 deletions databuilder/databuilder/utils/publisher_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,24 @@ def create_neo4j_node_key_constraint(node_file: str,
if label not in labels:
with driver.session(database=db_name) as session:
try:
create_stmt = Template("""
CREATE CONSTRAINT ON (node:{{ LABEL }}) ASSERT node.key IS UNIQUE
""").render(LABEL=label)
result = session.run(f"""
SHOW CONSTRAINTS
YIELD name, type, entityType, labelsOrTypes, properties
WHERE type = 'UNIQUENESS' AND entityType = 'NODE' AND labelsOrTypes = ['{label}'] AND properties = ['key']
RETURN count(*) AS constraintExists
""")
constraint_exists = result.single()["constraintExists"]

LOGGER.info(f'Trying to create index for label {label} if not exist: {create_stmt}')
# Step 2: Conditionally create the constraint
if constraint_exists == 0:

session.write_transaction(execute_neo4j_statement, create_stmt)
create_stmt = Template("""
CREATE CONSTRAINT FOR (node:{{ LABEL }}) REQUIRE node.key IS UNIQUE
""").render(LABEL=label)

LOGGER.info(f'Trying to create index for label {label} if not exist: {create_stmt}')

session.write_transaction(execute_neo4j_statement, create_stmt)
except Neo4jError as e:
if e.code != NEO4J_EQUIVALENT_SCHEMA_RULE_ALREADY_EXISTS_ERROR_CODE\
and e.code != NEO4J_INDEX_ALREADY_EXISTS_ERROR_CODE:
Expand Down
2 changes: 2 additions & 0 deletions databuilder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ jsonref==0.2
# amundsen-common>=0.16.0
amundsen-common==0.31.0+foodtruck.6
amundsen-rds==0.0.8

queryparser-python3==0.7.0
9 changes: 7 additions & 2 deletions databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

__version__ = '7.4.4+foodtruck.8'
__version__ = '7.4.4+foodtruck.9'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'requirements.txt')
Expand Down Expand Up @@ -99,9 +99,14 @@
'python-schema-registry-client==2.4.0'
]

query_parsing = [
'queryparser-python3>=0.7.0',
'sqlglot>=25.6.1'
]

all_deps = requirements + requirements_dev + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds \
+ atlas + salesforce + oracle + teradata + schema_registry
+ atlas + salesforce + oracle + teradata + schema_registry + query_parsing

setup(
name='amundsen-databuilder',
Expand Down
1 change: 1 addition & 0 deletions frontend/amundsen_application/api/metadata/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ def get_table_lineage() -> Response:
url = f'{table_endpoint}/{table_key}/lineage?depth={depth}&direction={direction}'
response = request_metadata(url=url, method=request.method)
json = response.json()
LOGGER.info(f'DREW={json}')
downstream = [marshall_lineage_item(item) for item in json.get('downstream_entities')]
upstream = [marshall_lineage_item(item) for item in json.get('upstream_entities')]
downstream_count = json.get('downstream_count')
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const SNOWFLAKE = 'Snowflake';
export const MYSQL = 'MySQL';
export const MSSQL = 'MSSQL';
export const CLICKHOUSE = 'ClickHouse';
export const REAL_INSIGHT = 'RealINSIGHT';
export const NOT_AVAILABLE_VALUE = 'not available';
export const DELAY_SHOW_POPOVER_MS = 500;
export const GITHUB_LOGO_PATH = '/static/images/github.png';
Expand All @@ -15,19 +16,22 @@ export const MYSQL_LOGO_PATH = '/static/images/mysql.png';
export const MSSQL_LOGO_PATH = '/static/images/mssql.png';
export const CLICKHOUSE_LOGO_PATH = '/static/images/clickhouse.png';
export const DATABASE_LOGO_PATH = '/static/images/database.png';
export const REALINSIGHT_LOGO_PATH = '/static/images/real_insight.png';
export const SOURCE_TYPE_TO_NAME = {
"github": GITHUB,
"aws_s3": AWSS3,
"snowflake": SNOWFLAKE,
"mysql": MYSQL,
"mssql": MSSQL,
"clickhouse": CLICKHOUSE,
"real_insight": REAL_INSIGHT,
}
export const SOURCE_TYPE_TO_IMAGE = {
"github": GITHUB_LOGO_PATH,
"aws_s3": AWSS3_LOGO_PATH,
"snowflake": SNOWFLAKE_LOGO_PATH,
"mysql": MYSQL_LOGO_PATH,
"mssql": MSSQL_LOGO_PATH,
"clickhouse": CLICKHOUSE_LOGO_PATH
"clickhouse": CLICKHOUSE_LOGO_PATH,
"real_insight": REALINSIGHT_LOGO_PATH
}
2 changes: 1 addition & 1 deletion metadata/metadata_service/api/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get(self, table_uri: str, column_name: str) -> Iterable[Union[Mapping, int,
schema = LineageSchema()
return schema.dump(lineage), HTTPStatus.OK
except Exception as e:
return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.NOT_FOUND
return {'message': f'Exception raised when getting column lineage: {e}'}, HTTPStatus.NOT_FOUND


class ColumnDescriptionAPI(Resource):
Expand Down
2 changes: 1 addition & 1 deletion metadata/metadata_service/api/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,4 @@ def get(self, id: str) -> Iterable[Union[Mapping, int, None]]:
schema = LineageSchema()
return schema.dump(lineage), HTTPStatus.OK
except Exception as e:
return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.NOT_FOUND
return {'message': f'Exception raised when getting file lineage: {e}'}, HTTPStatus.NOT_FOUND
2 changes: 1 addition & 1 deletion metadata/metadata_service/api/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get(self, id: str) -> Iterable[Union[Mapping, int, None]]:
return {'message': f'feature_uri {id} lineage does not exist'}, HTTPStatus.NOT_FOUND
except Exception as e:
LOGGER.error(f'Internal server error occurred when getting feature lineage: {e}')
return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.INTERNAL_SERVER_ERROR
return {'message': f'Exception raised when getting feature lineage: {e}'}, HTTPStatus.INTERNAL_SERVER_ERROR


class FeatureStatsAPI(Resource):
Expand Down
2 changes: 1 addition & 1 deletion metadata/metadata_service/api/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def get(self, id: str) -> Iterable[Union[Mapping, int, None]]:
schema = LineageSchema()
return schema.dump(lineage), HTTPStatus.OK
except Exception as e:
return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.NOT_FOUND
return {'message': f'Exception raised when getting table lineage: {e}'}, HTTPStatus.NOT_FOUND


class TableOwnerAPI(Resource):
Expand Down
14 changes: 10 additions & 4 deletions metadata/metadata_service/proxy/neo4j_fabric_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,24 @@ def _prepare_federated_query_statement(self, statement: str, resource_type: Reso
return federated_statement

def _get_fabric_query_statement(self, fabric_db_name: str, statement: str) -> str:
# fabric_statement = textwrap.dedent(f"""
# UNWIND {fabric_db_name}.graphIds() AS graphId
# CALL {{
# USE {fabric_db_name}.graph(graphId)
# {statement.replace(';','')}
# }}
# {self._prepare_federated_return_statement(statement=statement)}
# """)
fabric_statement = textwrap.dedent(f"""
UNWIND {fabric_db_name}.graphIds() AS graphId
UNWIND graph.names() AS graphName
CALL {{
USE {fabric_db_name}.graph(graphId)
USE graph.byName(graphName)
{statement.replace(';','')}
}}
{self._prepare_federated_return_statement(statement=statement)}
""")
LOGGER.info(f"_fabric_query_statement={fabric_statement}")
return fabric_statement


########################## OVERRIDE ##########################


Expand Down
Loading
Loading