Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/looker): support platform instance for dashboards & charts #10771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ecacb05
platform instance for looker dashboards and charts
sid-acryl Jun 25, 2024
8186bfe
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jun 26, 2024
e3adcd5
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jun 27, 2024
f9ba34d
include_looker_element_in_platform_instance is default to false
sid-acryl Jun 27, 2024
579cd8e
Merge branch 'cus2139-pi-in-looker-dash-chart' of github.com:sid-acry…
sid-acryl Jun 27, 2024
79fad87
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 1, 2024
ae6c239
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 1, 2024
d229612
address review comments
sid-acryl Jul 1, 2024
c530a4b
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 2, 2024
ddbf447
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 2, 2024
9240607
added platform instance aspect
sid-acryl Jul 2, 2024
71a146c
lint fix
sid-acryl Jul 2, 2024
ddb1ab7
Merge branch 'cus2139-pi-in-looker-dash-chart' of github.com:sid-acry…
sid-acryl Jul 2, 2024
ecca269
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 3, 2024
afad354
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 4, 2024
3692401
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 8, 2024
80b7e0e
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 8, 2024
1dd5802
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 14, 2024
02b3af0
address review comments
sid-acryl Jul 15, 2024
7d64b7a
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 16, 2024
c950d3d
update make_dashboard_urn
sid-acryl Jul 16, 2024
105d49f
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 16, 2024
6853a1a
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 18, 2024
37ad551
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 19, 2024
9b97cb7
emit dataplatformInstance entity
sid-acryl Jul 19, 2024
efbf0cf
resolve conflict
sid-acryl Jul 25, 2024
1cf0ff0
Merge branch 'master' into cus2139-pi-in-looker-dash-chart
sid-acryl Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,17 @@ class LookerDashboardSourceConfig(
)
extract_independent_looks: bool = Field(
False,
description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion should also be enabled.",
description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion "
"should also be enabled.",
)
emit_used_explores_only: bool = Field(
True,
description="When enabled, only explores that are used by a Dashboard/Look will be ingested.",
)
include_platform_instance_in_urns: bool = Field(
False,
description="When enabled, platform instance will be added in dashboard and chart urn.",
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)

@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
DataPlatformInstance,
Status,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
Expand All @@ -95,11 +96,13 @@
ChartTypeClass,
ContainerClass,
DashboardInfoClass,
DataPlatformInfoClass,
InputFieldClass,
InputFieldsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
PlatformTypeClass,
SubTypesClass,
)
from datahub.utilities.backpressure_aware_executor import BackpressureAwareExecutor
Expand Down Expand Up @@ -624,15 +627,47 @@ def _get_folder_browse_path_v2_entries(
if include_current_folder:
yield BrowsePathEntryClass(id=urn, urn=urn)

def _create_platform_instance_aspect(
self,
) -> DataPlatformInstance:

assert (
self.source_config.platform_name
), "Platform name is not set in the configuration."
assert (
self.source_config.platform_instance
), "Platform instance is not set in the configuration."

return DataPlatformInstance(
platform=builder.make_data_platform_urn(self.source_config.platform_name),
instance=builder.make_dataplatform_instance_urn(
platform=self.source_config.platform_name,
instance=self.source_config.platform_instance,
),
)

def _make_chart_urn(self, element_id: str) -> str:

platform_instance: Optional[str] = None

if self.source_config.include_platform_instance_in_urns:
platform_instance = self.source_config.platform_instance

return builder.make_chart_urn(
name=element_id,
platform=self.source_config.platform_name,
platform_instance=platform_instance,
)

def _make_chart_metadata_events(
self,
dashboard_element: LookerDashboardElement,
dashboard: Optional[
LookerDashboard
], # dashboard will be None if this is a standalone look
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
chart_urn = self._make_chart_urn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring _make_chart_metadata_events for readability and maintainability.

The function is quite large and could benefit from breaking it down into smaller helper methods.

- def _make_chart_metadata_events(
-     self,
-     dashboard_element: LookerDashboardElement,
-     dashboard: Optional[LookerDashboard]
- ) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
-     ...
+ def _make_chart_metadata_events(
+     self,
+     dashboard_element: LookerDashboardElement,
+     dashboard: Optional[LookerDashboard]
+ ) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
+     chart_urn = self._make_chart_urn(
+         element_id=dashboard_element.get_urn_element_id()
+     )
+     chart_snapshot = self._create_chart_snapshot(
+         chart_urn, dashboard_element, dashboard
+     )
+     proposals = self._create_chart_proposals(chart_snapshot, dashboard_element, dashboard)
+     return proposals

+ def _create_chart_snapshot(
+     self,
+     chart_urn: str,
+     dashboard_element: LookerDashboardElement,
+     dashboard: Optional[LookerDashboard]
+ ) -> ChartSnapshot:
+     chart_snapshot = ChartSnapshot(
+         urn=chart_urn,
+         aspects=[Status(removed=False)],
+     )
+     chart_info = ChartInfoClass(
+         type=self._get_chart_type(dashboard_element),
+         description=dashboard_element.description or "",
+         title=dashboard_element.title or "",
+         lastModified=ChangeAuditStamps(),
+         chartUrl=dashboard_element.url(self.source_config.external_base_url or ""),
+         inputs=dashboard_element.get_view_urns(self.source_config),
+         customProperties={
+             "upstream_fields": (
+                 ",".join(
+                     sorted({field.name for field in dashboard_element.input_fields})
+                 )
+                 if dashboard_element.input_fields
+                 else ""
+             )
+         },
+     )
+     chart_snapshot.aspects.append(chart_info)
+     self._add_browse_paths(chart_snapshot, dashboard, dashboard_element)
+     self._add_ownership(chart_snapshot, dashboard, dashboard_element)
+     return chart_snapshot

+ def _create_chart_proposals(
+     self,
+     chart_snapshot: ChartSnapshot,
+     dashboard_element: LookerDashboardElement,
+     dashboard: Optional[LookerDashboard]
+ ) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
+     proposals = [
+         MetadataChangeEvent(proposedSnapshot=chart_snapshot),
+         MetadataChangeProposalWrapper(
+             entityUrn=chart_snapshot.urn,
+             aspect=SubTypesClass(typeNames=[BIAssetSubTypes.LOOKER_LOOK]),
+         ),
+     ]
+     if self.source_config.extract_embed_urls and self.source_config.external_base_url:
+         maybe_embed_url = dashboard_element.embed_url(self.source_config.external_base_url)
+         if maybe_embed_url:
+             proposals.append(
+                 create_embed_mcp(chart_snapshot.urn, maybe_embed_url)
+             )
+     if dashboard is None and dashboard_element.folder:
+         container = ContainerClass(
+             container=self._gen_folder_key(dashboard_element.folder.id).as_urn(),
+         )
+         proposals.append(
+             MetadataChangeProposalWrapper(entityUrn=chart_snapshot.urn, aspect=container)
+         )
+     if browse_path_v2:
+         proposals.append(
+             MetadataChangeProposalWrapper(
+                 entityUrn=chart_snapshot.urn, aspect=browse_path_v2
+             )
+         )
+     return proposals
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
chart_urn = self._make_chart_urn(
def _make_chart_metadata_events(
self,
dashboard_element: LookerDashboardElement,
dashboard: Optional[LookerDashboard]
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
chart_snapshot = self._create_chart_snapshot(
chart_urn, dashboard_element, dashboard
)
proposals = self._create_chart_proposals(chart_snapshot, dashboard_element, dashboard)
return proposals
def _create_chart_snapshot(
self,
chart_urn: str,
dashboard_element: LookerDashboardElement,
dashboard: Optional[LookerDashboard]
) -> ChartSnapshot:
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[Status(removed=False)],
)
chart_info = ChartInfoClass(
type=self._get_chart_type(dashboard_element),
description=dashboard_element.description or "",
title=dashboard_element.title or "",
lastModified=ChangeAuditStamps(),
chartUrl=dashboard_element.url(self.source_config.external_base_url or ""),
inputs=dashboard_element.get_view_urns(self.source_config),
customProperties={
"upstream_fields": (
",".join(
sorted({field.name for field in dashboard_element.input_fields})
)
if dashboard_element.input_fields
else ""
)
},
)
chart_snapshot.aspects.append(chart_info)
self._add_browse_paths(chart_snapshot, dashboard, dashboard_element)
self._add_ownership(chart_snapshot, dashboard, dashboard_element)
return chart_snapshot
def _create_chart_proposals(
self,
chart_snapshot: ChartSnapshot,
dashboard_element: LookerDashboardElement,
dashboard: Optional[LookerDashboard]
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
proposals = [
MetadataChangeEvent(proposedSnapshot=chart_snapshot),
MetadataChangeProposalWrapper(
entityUrn=chart_snapshot.urn,
aspect=SubTypesClass(typeNames=[BIAssetSubTypes.LOOKER_LOOK]),
),
]
if self.source_config.extract_embed_urls and self.source_config.external_base_url:
maybe_embed_url = dashboard_element.embed_url(self.source_config.external_base_url)
if maybe_embed_url:
proposals.append(
create_embed_mcp(chart_snapshot.urn, maybe_embed_url)
)
if dashboard is None and dashboard_element.folder:
container = ContainerClass(
container=self._gen_folder_key(dashboard_element.folder.id).as_urn(),
)
proposals.append(
MetadataChangeProposalWrapper(entityUrn=chart_snapshot.urn, aspect=container)
)
if browse_path_v2:
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=chart_snapshot.urn, aspect=browse_path_v2
)
)
return proposals

element_id=dashboard_element.get_urn_element_id()
)
chart_snapshot = ChartSnapshot(
urn=chart_urn,
Expand Down Expand Up @@ -713,6 +748,14 @@ def _make_chart_metadata_events(
),
]

if self.source_config.include_platform_instance_in_urns:
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=chart_urn,
aspect=self._create_platform_instance_aspect(),
),
)

# If extracting embeds is enabled, produce an MCP for embed URL.
if (
self.source_config.extract_embed_urls
Expand Down Expand Up @@ -818,11 +861,26 @@ def _make_dashboard_metadata_events(
)
)

if self.source_config.include_platform_instance_in_urns:
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=dashboard_urn,
aspect=self._create_platform_instance_aspect(),
)
)

return proposals

def make_dashboard_urn(self, looker_dashboard: LookerDashboard) -> str:
platform_instance: Optional[str] = None

if self.source_config.include_platform_instance_in_urns:
platform_instance = self.source_config.platform_instance

return builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
name=looker_dashboard.get_urn_dashboard_id(),
platform=self.source_config.platform_name,
platform_instance=platform_instance,
)

def _make_explore_metadata_events(
Expand Down Expand Up @@ -1154,8 +1212,8 @@ def _input_fields_from_dashboard_element(

# enrich the input_fields with the fully hydrated ViewField from the now fetched explores
for input_field in input_fields:
entity_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
entity_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
Comment on lines +1215 to +1216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring _input_fields_from_dashboard_element for readability and maintainability.

The function is quite large and could benefit from breaking it down into smaller helper methods.

- def _input_fields_from_dashboard_element(
-     self, dashboard_element: LookerDashboardElement
- ) -> List[InputFieldClass]:
-     ...
+ def _input_fields_from_dashboard_element(
+     self, dashboard_element: LookerDashboardElement
+ ) -> List[InputFieldClass]:
+     input_fields = (
+         dashboard_element.input_fields
+         if dashboard_element.input_fields is not None
+         else []
+     )
+     fields_for_mcp = []
+     for input_field in input_fields:
+         fields_for_mcp.extend(self._process_input_field(input_field, dashboard_element))
+     return fields_for_mcp

+ def _process_input_field(
+     self, input_field: InputFieldElement, dashboard_element: LookerDashboardElement
+ ) -> List[InputFieldClass]:
+     entity_urn = self._make_chart_urn(
+         element_id=dashboard_element.get_urn_element_id()
+     )
+     view_field_for_reference = input_field.view_field
+     if input_field.view_field is None:
+         explore = self.explore_registry.get_explore(
+             input_field.model, input_field.explore
+         )
+         if explore is not None:
+             self.add_reachable_explore(
+                 input_field.model, input_field.explore, entity_urn
+             )
+             entity_urn = explore.get_explore_urn(self.source_config)
+             explore_fields = (
+                 explore.fields if explore.fields is not None else []
+             )
+             relevant_field = next(
+                 (
+                     field
+                     for field in explore_fields
+                     if field.name == input_field.name
+                 ),
+                 None,
+             )
+             if relevant_field is not None:
+                 view_field_for_reference = relevant_field
+     if view_field_for_reference and view_field_for_reference.name:
+         return [
+             InputFieldClass(
+                 schemaFieldUrn=builder.make_schema_field_urn(
+                     entity_urn, view_field_for_reference.name
+                 ),
+                 schemaField=LookerUtil.view_field_to_schema_field(
+                     view_field_for_reference,
+                     self.reporter,
+                     self.source_config.tag_measures_and_dimensions,
+                 ),
+             )
+         ]
+     return []
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
entity_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
def _input_fields_from_dashboard_element(
self, dashboard_element: LookerDashboardElement
) -> List[InputFieldClass]:
input_fields = (
dashboard_element.input_fields
if dashboard_element.input_fields is not None
else []
)
fields_for_mcp = []
for input_field in input_fields:
fields_for_mcp.extend(self._process_input_field(input_field, dashboard_element))
return fields_for_mcp
def _process_input_field(
self, input_field: InputFieldElement, dashboard_element: LookerDashboardElement
) -> List[InputFieldClass]:
entity_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
view_field_for_reference = input_field.view_field
if input_field.view_field is None:
explore = self.explore_registry.get_explore(
input_field.model, input_field.explore
)
if explore is not None:
self.add_reachable_explore(
input_field.model, input_field.explore, entity_urn
)
entity_urn = explore.get_explore_urn(self.source_config)
explore_fields = (
explore.fields if explore.fields is not None else []
)
relevant_field = next(
(
field
for field in explore_fields
if field.name == input_field.name
),
None,
)
if relevant_field is not None:
view_field_for_reference = relevant_field
if view_field_for_reference and view_field_for_reference.name:
return [
InputFieldClass(
schemaFieldUrn=builder.make_schema_field_urn(
entity_urn, view_field_for_reference.name
),
schemaField=LookerUtil.view_field_to_schema_field(
view_field_for_reference,
self.reporter,
self.source_config.tag_measures_and_dimensions,
),
)
]
return []

)
view_field_for_reference = input_field.view_field

Expand Down Expand Up @@ -1220,8 +1278,8 @@ def _make_metrics_dimensions_dashboard_mcp(
def _make_metrics_dimensions_chart_mcp(
self, dashboard_element: LookerDashboardElement
) -> MetadataChangeProposalWrapper:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
chart_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
input_fields_aspect = InputFieldsClass(
fields=self._input_fields_from_dashboard_element(dashboard_element)
Expand Down Expand Up @@ -1513,6 +1571,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

looker_dashboards_for_usage: List[looker_usage.LookerDashboardForUsage] = []

# Emit platform instance entity
if self.source_config.platform_instance:
platform_instance_urn = builder.make_dataplatform_instance_urn(
platform=self.source_config.platform_name,
instance=self.source_config.platform_instance,
)

yield MetadataWorkUnit(
id=f"{platform_instance_urn}-aspect-dataplatformInfo",
mcp=MetadataChangeProposalWrapper(
entityUrn=platform_instance_urn,
aspect=DataPlatformInfoClass(
name=self.source_config.platform_instance,
type=PlatformTypeClass.OTHERS,
datasetNameDelimiter=".",
),
),
)

with self.reporter.report_stage("dashboard_chart_metadata"):
for job in BackpressureAwareExecutor.map(
self.process_dashboard,
Expand Down
Loading
Loading