Skip to content

Commit

Permalink
fix(ingest/snowflake): add limits on tables/columns/queries in lineage (
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and aviv-julienjehannet committed Jul 17, 2024
1 parent 0117d82 commit 482c236
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,9 @@ def table_upstreams_with_column_lineage(
upstreams_deny_pattern,
["upstream_table_name", "upstream_column_table_name"],
)
_MAX_UPSTREAMS_PER_DOWNSTREAM = 20
_MAX_UPSTREAM_COLUMNS_PER_DOWNSTREAM = 400
_MAX_QUERIES_PER_DOWNSTREAM = 30

return f"""
WITH column_lineage_history AS (
Expand Down Expand Up @@ -777,7 +780,7 @@ def table_upstreams_with_column_lineage(
queries AS (
select qid.downstream_table_name, qid.query_id, query_history.query_text, query_history.start_time
from query_ids qid
LEFT JOIN (
JOIN (
SELECT * FROM snowflake.account_usage.query_history
WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3)
Expand All @@ -787,26 +790,26 @@ def table_upstreams_with_column_lineage(
SELECT
h.downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
ANY_VALUE(h.downstream_table_domain) AS "DOWNSTREAM_TABLE_DOMAIN",
ARRAY_UNIQUE_AGG(
ARRAY_SLICE(ARRAY_UNIQUE_AGG(
OBJECT_CONSTRUCT(
'upstream_object_name', h.upstream_table_name,
'upstream_object_domain', h.upstream_table_domain,
'query_id', h.query_id
)
) AS "UPSTREAM_TABLES",
ARRAY_UNIQUE_AGG(
), 0, {_MAX_UPSTREAMS_PER_DOWNSTREAM}) AS "UPSTREAM_TABLES",
ARRAY_SLICE(ARRAY_UNIQUE_AGG(
OBJECT_CONSTRUCT(
'column_name', column_upstreams.downstream_column_name,
'upstreams', column_upstreams.upstreams
'column_name', column_upstreams.downstream_column_name,
'upstreams', column_upstreams.upstreams
)
) AS "UPSTREAM_COLUMNS",
ARRAY_UNIQUE_AGG(
), 0, {_MAX_UPSTREAM_COLUMNS_PER_DOWNSTREAM}) AS "UPSTREAM_COLUMNS",
ARRAY_SLICE(ARRAY_UNIQUE_AGG(
OBJECT_CONSTRUCT(
'query_id', q.query_id,
'query_text', q.query_text,
'start_time', q.start_time
)
) as "QUERIES"
), 0, {_MAX_QUERIES_PER_DOWNSTREAM}) as "QUERIES"
FROM
table_upstream_jobs_unique h
LEFT JOIN column_upstreams column_upstreams
Expand Down

0 comments on commit 482c236

Please sign in to comment.