Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dbt): generate CLL for all node types #9964

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def make_ts_millis(ts: datetime) -> int:


def make_ts_millis(ts: Optional[datetime]) -> Optional[int]:
# TODO: This duplicates the functionality of datetime_to_ts_millis
if ts is None:
return None
return int(ts.timestamp() * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def convert_upstream_lineage_to_patch(
patch_builder = DatasetPatchBuilder(urn, system_metadata)
for upstream in aspect.upstreams:
patch_builder.add_upstream_lineage(upstream)
for fine_upstream in aspect.fineGrainedLineages or []:
patch_builder.add_fine_grained_upstream_lineage(fine_upstream)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp)

Expand Down Expand Up @@ -125,6 +127,7 @@ def auto_incremental_lineage(
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

# TODO: Replace with CLL patch now that we have support for it.
if lineage_aspect.fineGrainedLineages:
if graph is None:
raise ValueError(
Expand Down
136 changes: 84 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
UpstreamClass,
UpstreamLineage,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down Expand Up @@ -123,6 +122,8 @@
logger = logging.getLogger(__name__)
DBT_PLATFORM = "dbt"

_DEFAULT_ACTOR = mce_builder.make_user_urn("unknown")


@dataclass
class DBTSourceReport(StaleEntityRemovalSourceReport):
Expand Down Expand Up @@ -634,18 +635,37 @@ def get_upstreams(
return upstream_urns


def get_upstream_lineage(upstream_urns: List[str]) -> UpstreamLineage:
ucl: List[UpstreamClass] = []

for dep in upstream_urns:
uc = UpstreamClass(
dataset=dep,
type=DatasetLineageTypeClass.TRANSFORMED,
)
uc.auditStamp.time = int(datetime.utcnow().timestamp() * 1000)
ucl.append(uc)
def make_mapping_upstream_lineage(
upstream_urn: str, downstream_urn: str, node: DBTNode
) -> UpstreamLineageClass:
cll = None
if node.columns:
cll = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
mce_builder.make_schema_field_urn(upstream_urn, column.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the column name always the same for up and downstream?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is specifically for dbt sources, where it is a 1:1 mapping

],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
mce_builder.make_schema_field_urn(downstream_urn, column.name)
],
)
for column in node.columns
]

return UpstreamLineage(upstreams=ucl)
return UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=upstream_urn,
type=DatasetLineageTypeClass.COPY,
auditStamp=AuditStamp(
time=mce_builder.get_sys_time(), actor=_DEFAULT_ACTOR
),
)
],
fineGrainedLineages=cll,
)


# See https://github.com/fishtown-analytics/dbt/blob/master/core/dbt/adapters/sql/impl.py
Expand Down Expand Up @@ -1086,7 +1106,6 @@ def create_platform_mces(
self.config.strip_user_ids_from_email,
)
for node in sorted(dbt_nodes, key=lambda n: n.dbt_name):
is_primary_source = mce_platform == DBT_PLATFORM
node_datahub_urn = node.get_urn(
mce_platform,
self.config.env,
Expand All @@ -1097,11 +1116,6 @@ def create_platform_mces(
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
)
continue
if not is_primary_source:
# We previously, erroneously added non-dbt nodes to the state object.
# This call ensures that we don't try to soft-delete them after an
# upgrade of acryl-datahub.
self.stale_entity_removal_handler.add_urn_to_skip(node_datahub_urn)

meta_aspects: Dict[str, Any] = {}
if self.config.enable_meta_mapping and node.meta:
Expand Down Expand Up @@ -1134,9 +1148,17 @@ def create_platform_mces(
if sub_type_wu:
yield sub_type_wu

else:
if len(aspects) == 0:
continue
dataset_snapshot = DatasetSnapshot(
urn=node_datahub_urn, aspects=aspects
)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
mce = self.get_patched_mce(mce)
yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce)
else: # mce_platform != DBT_PLATFORM:
# We are creating empty node for platform and only add lineage/keyaspect.
aspects = []
if not node.exists_in_target_platform:
continue

Expand All @@ -1149,28 +1171,25 @@ def create_platform_mces(
self.config.env,
self.config.platform_instance,
)
upstreams_lineage_class = get_upstream_lineage([upstream_dbt_urn])
if not is_primary_source and self.config.incremental_lineage:
upstreams_lineage_class = make_mapping_upstream_lineage(
upstream_urn=upstream_dbt_urn,
downstream_urn=node_datahub_urn,
node=node,
)
if self.config.incremental_lineage:
# We only generate incremental lineage for non-dbt nodes.
wu = convert_upstream_lineage_to_patch(
urn=node_datahub_urn,
aspect=upstreams_lineage_class,
system_metadata=None,
)
wu.is_primary_source = is_primary_source
wu.is_primary_source = False
yield wu
else:
aspects.append(upstreams_lineage_class)

if len(aspects) == 0:
continue
dataset_snapshot = DatasetSnapshot(urn=node_datahub_urn, aspects=aspects)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
mce = self.get_patched_mce(mce)
yield MetadataWorkUnit(
id=dataset_snapshot.urn, mce=mce, is_primary_source=is_primary_source
)
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=upstreams_lineage_class,
).as_workunit()

def extract_query_tag_aspects(
self,
Expand Down Expand Up @@ -1488,17 +1507,24 @@ def _create_lineage_aspect_for_dbt_node(
This method creates lineage amongst dbt nodes. A dbt node can be linked to other dbt nodes or a platform node.
"""

node_urn = node.get_urn(
target_platform=DBT_PLATFORM,
env=self.config.env,
data_platform_instance=self.config.platform_instance,
)

# if a node is of type source in dbt, its upstream lineage should have the corresponding table/view
# from the platform. This code block is executed when we are generating entities of type "dbt".
if node.node_type == "source":
upstream_urns = [
node.get_urn(
return make_mapping_upstream_lineage(
upstream_urn=node.get_urn(
self.config.target_platform,
self.config.env,
self.config.target_platform_instance,
)
]
cll = None
),
downstream_urn=node_urn,
node=node,
)
else:
upstream_urns = get_upstreams(
node.upstream_nodes,
Expand All @@ -1509,12 +1535,6 @@ def _create_lineage_aspect_for_dbt_node(
self.config.platform_instance,
)

node_urn = node.get_urn(
target_platform=DBT_PLATFORM,
env=self.config.env,
data_platform_instance=self.config.platform_instance,
)

def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
return all_nodes_map[dbt_name].get_urn_for_upstream_lineage(
dbt_platform_instance=self.config.platform_instance,
Expand Down Expand Up @@ -1553,14 +1573,26 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
)
]

if upstream_urns:
upstreams_lineage_class = get_upstream_lineage(upstream_urns)
if not upstream_urns:
return None

if self.config.include_column_lineage and cll:
upstreams_lineage_class.fineGrainedLineages = cll

return upstreams_lineage_class
return None
auditStamp = AuditStamp(
time=mce_builder.get_sys_time(),
actor=_DEFAULT_ACTOR,
)
return UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=upstream,
type=DatasetLineageTypeClass.TRANSFORMED,
auditStamp=auditStamp,
)
for upstream in upstream_urns
],
fineGrainedLineages=(cll or None)
if self.config.include_column_lineage
else None,
)

# This method attempts to read-modify and return the owners of a dataset.
# From the existing owners it will remove the owners that are of the source_type_filter and
Expand Down
Loading
Loading