Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): support domains in meta -> "datahub" section #10967

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
"numpy<2",
}

dbt_common = {
*sqlglot_lib,
"more_itertools",
}

sql_common = (
{
# Required for all SQL sources.
Expand Down Expand Up @@ -352,8 +357,8 @@
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake},
"dbt": {"requests"} | sqlglot_lib | aws_common,
"dbt-cloud": {"requests"} | sqlglot_lib,
"dbt": {"requests"} | dbt_common | aws_common,
"dbt-cloud": {"requests"} | dbt_common,
"druid": sql_common | {"pydruid>=0.6.2"},
"dynamodb": aws_common | classification_lib,
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
Expand Down
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

import typing_inspect
from avrogen.dict_wrapper import DictWrapper

from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -412,9 +413,9 @@ def make_lineage_mce(
return mce


def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)

def can_add_aspect_to_snapshot(
SnapshotType: Type[DictWrapper], AspectType: Type[Aspect]
) -> bool:
constructor_annotations = get_type_hints(SnapshotType.__init__)
aspect_list_union = typing_inspect.get_args(constructor_annotations["aspects"])[0]

Expand All @@ -423,6 +424,12 @@ def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> b
return issubclass(AspectType, supported_aspect_types)


def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)

return can_add_aspect_to_snapshot(SnapshotType, AspectType)


def assert_can_add_aspect(
mce: MetadataChangeEventClass, AspectType: Type[Aspect]
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from enum import auto
from typing import Any, Dict, Iterable, List, Optional, Tuple

import more_itertools
import pydantic
from pydantic import root_validator, validator
from pydantic.fields import Field
Expand Down Expand Up @@ -1309,8 +1310,23 @@ def create_dbt_platform_mces(
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

standalone_aspects, snapshot_aspects = more_itertools.partition(
(
lambda aspect: mce_builder.can_add_aspect_to_snapshot(
DatasetSnapshot, type(aspect)
)
),
aspects,
)
for aspect in standalone_aspects:
# The domains aspect, and some others, may not support being added to the snapshot.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=aspect,
).as_workunit()

dataset_snapshot = DatasetSnapshot(
urn=node_datahub_urn, aspects=aspects
urn=node_datahub_urn, aspects=list(snapshot_aspects)
)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
Expand Down Expand Up @@ -1588,6 +1604,10 @@ def _generate_base_dbt_aspects(
):
aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION))

# add meta domains aspect
if meta_aspects.get(Constants.ADD_DOMAIN_OPERATION):
aspects.append(meta_aspects.get(Constants.ADD_DOMAIN_OPERATION))

# add meta links aspect
meta_links_aspect = meta_aspects.get(Constants.ADD_DOC_LINK_OPERATION)
if meta_links_aspect and self.config.enable_meta_mapping:
Expand Down
45 changes: 33 additions & 12 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from datahub.metadata.schema_classes import (
AuditStampClass,
DomainsClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
OwnerClass,
Expand Down Expand Up @@ -70,6 +71,8 @@ class Constants:
ADD_TERM_OPERATION = "add_term"
ADD_TERMS_OPERATION = "add_terms"
ADD_OWNER_OPERATION = "add_owner"
ADD_DOMAIN_OPERATION = "add_domain"

OPERATION = "operation"
OPERATION_CONFIG = "config"
TAG = "tag"
Expand All @@ -94,9 +97,15 @@ class _MappingOwner(ConfigModel):


class _DatahubProps(ConfigModel):
owners: List[Union[str, _MappingOwner]]
tags: Optional[List[str]] = None
terms: Optional[List[str]] = None
owners: Optional[List[Union[str, _MappingOwner]]] = None
domain: Optional[str] = None

def make_owner_category_list(self) -> List[Dict]:
if self.owners is None:
return []

res = []
for owner in self.owners:
if isinstance(owner, str):
Expand Down Expand Up @@ -176,26 +185,29 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # noqa: C901
# Process the special "datahub" property, which supports tags, terms, and owners.
operations_map: Dict[str, list] = {}
try:
datahub_prop = raw_props.get("datahub")
if datahub_prop and isinstance(datahub_prop, dict):
if datahub_prop.get("tags"):
raw_datahub_prop = raw_props.get("datahub")
if raw_datahub_prop:
datahub_prop = _DatahubProps.parse_obj_allow_extras(raw_datahub_prop)
if datahub_prop.tags:
# Note that tags get converted to urns later because we need to support the tag prefix.
tags = datahub_prop["tags"]
operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend(
tags
datahub_prop.tags
)

if datahub_prop.get("terms"):
terms = datahub_prop["terms"]
if datahub_prop.terms:
operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend(
mce_builder.make_term_urn(term) for term in terms
mce_builder.make_term_urn(term) for term in datahub_prop.terms
)

if datahub_prop.get("owners"):
owners = _DatahubProps.parse_obj_allow_extras(datahub_prop)
if datahub_prop.owners:
operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend(
owners.make_owner_category_list()
datahub_prop.make_owner_category_list()
)

if datahub_prop.domain:
operations_map.setdefault(
Constants.ADD_DOMAIN_OPERATION, []
).append(mce_builder.make_domain_urn(datahub_prop.domain))
except Exception as e:
logger.error(f"Error while processing datahub property: {e}")

Expand Down Expand Up @@ -299,6 +311,15 @@ def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]:
)
aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect

if Constants.ADD_DOMAIN_OPERATION in operation_map:
domain_aspect = DomainsClass(
domains=[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to check if domain exists?
We do this in our SQL sources ->

self.domain_registry: Optional[DomainRegistry] = None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the SQL sources we try to map name -> domain. here, we only support domain urns so don't need the DomainRegistry

mce_builder.make_domain_urn(domain)
for domain in operation_map[Constants.ADD_DOMAIN_OPERATION]
]
)
aspect_map[Constants.ADD_DOMAIN_OPERATION] = domain_aspect

if Constants.ADD_DOC_LINK_OPERATION in operation_map:
try:
if len(
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/tests/unit/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
from datahub.metadata.schema_classes import (
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
InstitutionalMemoryClass,
Expand Down Expand Up @@ -366,6 +367,7 @@ def test_operation_processor_datahub_props():
"owner_type": "urn:li:ownershipType:steward",
},
],
"domain": "domain1",
}
}

Expand Down Expand Up @@ -396,3 +398,6 @@ def test_operation_processor_datahub_props():
assert [
term_association.urn for term_association in aspect_map["add_term"].terms
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]

assert isinstance(aspect_map["add_domain"], DomainsClass)
assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]
Loading