Skip to content

Commit

Permalink
feat(ingest): support incremental column-level lineage (#10090)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 21, 2024
1 parent 6c3834b commit 8c21b17
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 471 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import copy
from typing import Dict, Iterable, Optional
from typing import Iterable, Optional

from pydantic.fields import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import datahub_guid, set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
FineGrainedLineageClass,
MetadataChangeEventClass,
SystemMetadataClass,
UpstreamClass,
UpstreamLineageClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
Expand All @@ -29,7 +25,7 @@ def convert_upstream_lineage_to_patch(
for fine_upstream in aspect.fineGrainedLineages or []:
patch_builder.add_fine_grained_upstream_lineage(fine_upstream)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp)
return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp)


def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
Expand All @@ -42,73 +38,7 @@ def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
)


def _merge_upstream_lineage(
new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass
) -> UpstreamLineageClass:
merged_aspect = copy.deepcopy(gms_aspect)

upstreams_map: Dict[str, UpstreamClass] = {
upstream.dataset: upstream for upstream in merged_aspect.upstreams
}

upstreams_updated = False
fine_upstreams_updated = False

for table_upstream in new_aspect.upstreams:
if table_upstream.dataset not in upstreams_map or (
table_upstream.auditStamp.time
> upstreams_map[table_upstream.dataset].auditStamp.time
):
upstreams_map[table_upstream.dataset] = table_upstream
upstreams_updated = True

if upstreams_updated:
merged_aspect.upstreams = list(upstreams_map.values())

if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages:
fine_upstreams_map: Dict[str, FineGrainedLineageClass] = {
get_fine_grained_lineage_key(fine_upstream): fine_upstream
for fine_upstream in merged_aspect.fineGrainedLineages
}
for column_upstream in new_aspect.fineGrainedLineages:
column_upstream_key = get_fine_grained_lineage_key(column_upstream)

if column_upstream_key not in fine_upstreams_map or (
column_upstream.confidenceScore
> fine_upstreams_map[column_upstream_key].confidenceScore
):
fine_upstreams_map[column_upstream_key] = column_upstream
fine_upstreams_updated = True

if fine_upstreams_updated:
merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values())
else:
merged_aspect.fineGrainedLineages = (
new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages
)

return merged_aspect


def _lineage_wu_via_read_modify_write(
graph: DataHubGraph,
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
else:
new_aspect = aspect

return MetadataChangeProposalWrapper(
entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata
).as_workunit()


def auto_incremental_lineage(
graph: Optional[DataHubGraph],
incremental_lineage: bool,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -117,35 +47,23 @@ def auto_incremental_lineage(
return # early exit

for wu in stream:
urn = wu.get_urn()

lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type(
UpstreamLineageClass
)
urn = wu.get_urn()
if isinstance(wu.metadata, MetadataChangeEventClass):
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll handle upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

if lineage_aspect:
if isinstance(wu.metadata, MetadataChangeEventClass):
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll emit upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

# TODO: Replace with CLL patch now that we have support for it.
if lineage_aspect.fineGrainedLineages:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
if lineage_aspect.upstreams:
yield convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu


class IncrementalLineageConfigMixin(ConfigModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import functools
import logging
import os
import re
Expand Down Expand Up @@ -27,6 +28,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand Down Expand Up @@ -167,11 +169,6 @@ def cleanup(config: BigQueryV2Config) -> None:
SourceCapability.USAGE_STATS,
"Enabled by default, can be disabled via configuration `include_usage_statistics`",
)
@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
supported=True,
)
@capability(
SourceCapability.CLASSIFICATION,
"Optionally enabled via `classification.enabled`",
Expand Down Expand Up @@ -574,6 +571,9 @@ def gen_dataset_containers(
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional

import pydantic
from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator
Expand Down Expand Up @@ -212,21 +211,11 @@ class BigQueryV2Config(
)

extract_column_lineage: bool = Field(
# TODO: Flip this default to True once we support patching column-level lineage.
default=False,
description="If enabled, generate column level lineage. "
"Requires lineage_use_sql_parser to be enabled. "
"This and `incremental_lineage` cannot both be enabled.",
"Requires lineage_use_sql_parser to be enabled.",
)

@pydantic.validator("extract_column_lineage")
def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
if v and values.get("incremental_lineage"):
raise ValueError(
"Cannot enable `extract_column_lineage` and `incremental_lineage` at the same time."
)
return v

extract_lineage_from_catalog: bool = Field(
default=False,
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class RedshiftConfig(
)

use_lineage_v2: bool = Field(
default=False,
default=True,
description="Whether to use the new SQL-based lineage collector.",
)
lineage_v2_generate_queries: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import itertools
import logging
from collections import defaultdict
from functools import partial
from typing import Dict, Iterable, List, Optional, Type, Union

import humanfriendly
Expand Down Expand Up @@ -402,10 +402,8 @@ def gen_database_container(self, database: str) -> Iterable[MetadataWorkUnit]:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
partial(
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import functools
import json
import logging
import os
import os.path
import platform
from dataclasses import dataclass
from functools import partial
from typing import Callable, Dict, Iterable, List, Optional, Union

from snowflake.connector import SnowflakeConnection
Expand Down Expand Up @@ -510,10 +510,8 @@ def query(query):
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
partial(
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import datetime
import functools
import logging
import traceback
from dataclasses import dataclass, field
Expand Down Expand Up @@ -514,10 +515,8 @@ def get_schema_level_workunits(
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
partial(
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
Expand Down

This file was deleted.

Loading

0 comments on commit 8c21b17

Please sign in to comment.