Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/nifi): ingest process group as browse path v2, incremental lineage #10202

Merged
merged 7 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions metadata-ingestion/docs/sources/nifi/nifi_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Concept Mapping

| Source Concept | DataHub Concept | Notes |
| --------------------------------- | --------------------------------------------------------- | ----------------------- |
| `"Nifi"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | |
| Nifi flow | [Data Flow](../../metamodel/entities/dataFlow.md) | |
| Nifi Ingress / Egress Processor | [Data Job](../../metamodel/entities/dataJob.md) | |
| Nifi Remote Port | [Data Job](../../metamodel/entities/dataJob.md) | |
| Nifi Port with remote connections | [Dataset](../../metamodel/entities/dataset.md) | |
| Nifi Process Group | [Container](../../metamodel/entities/container.md) | Subtype `Process Group` |

### Caveats
- This plugin extracts the lineage information between external datasets and ingress/egress processors by analyzing provenance events. Please check your Nifi configuration to confirm max rentention period of provenance events and make sure that ingestion runs frequent enough to read provenance events before they are disappear.

- Limited ingress/egress processors are supported
- S3: `ListS3`, `FetchS3Object`, `PutS3Object`
- SFTP: `ListSFTP`, `FetchSFTP`, `GetSFTP`, `PutSFTP`
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class BIContainerSubTypes(str, Enum):
QLIK_APP = "Qlik App"


class JobContainerSubTypes(str, Enum):
NIFI_PROCESS_GROUP = "Process Group"


class BIAssetSubTypes(str, Enum):
# Generic SubTypes
REPORT = "Report"
Expand Down
164 changes: 143 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import EnvConfigMixin
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ContainerKey, gen_containers
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand All @@ -33,13 +34,17 @@
)
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import JobContainerSubTypes
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsV2Class,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
)
from datahub.specific.datajob import DataJobPatchBuilder

logger = logging.getLogger(__name__)
NIFI = "nifi"
Expand Down Expand Up @@ -70,6 +75,10 @@ class NifiAuthType(Enum):
BASIC_AUTH = "BASIC_AUTH"


class ProcessGroupKey(ContainerKey):
process_group_id: str


class NifiSourceConfig(EnvConfigMixin):
site_url: str = Field(
description="URL for Nifi, ending with /nifi/. e.g. https://mynifi.domain/nifi/"
Expand Down Expand Up @@ -123,7 +132,21 @@ class NifiSourceConfig(EnvConfigMixin):
# root CA trusted by client system, e.g. self-signed certificates
ca_file: Optional[Union[bool, str]] = Field(
default=None,
description="Path to PEM file containing certs for the root CA(s) for the NiFi",
description="Path to PEM file containing certs for the root CA(s) for the NiFi."
"Set to False to disable SSL verification.",
)

# As of now, container entities retrieval does not respect browsePathsV2 similar to container aspect.
# Consider enabling this when entities with browsePathsV2 pointing to container also get listed in container entities.
emit_process_group_as_container: bool = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only needed until we show browsePathV2 on entity cards right

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

default=False,
description="Whether to emit Nifi process groups as container entities.",
)

incremental_lineage: bool = Field(
default=True,
description="When enabled, emits incremental/patch lineage for Nifi processors."
" When disabled, re-states lineage on each run.",
)

@root_validator(skip_on_failure=True)
Expand Down Expand Up @@ -364,21 +387,6 @@ def report_dropped(self, ent_name: str) -> None:
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.LINEAGE_COARSE, "Supported. See docs for limitations")
class NifiSource(Source):
"""
This plugin extracts the following:

- NiFi flow as `DataFlow` entity
- Ingress, egress processors, remote input and output ports as `DataJob` entity
- Input and output ports receiving remote connections as `Dataset` entity
- Lineage information between external datasets and ingress/egress processors by analyzing provenance events

Current limitations:

- Limited ingress/egress processors are supported
- S3: `ListS3`, `FetchS3Object`, `PutS3Object`
- SFTP: `ListSFTP`, `FetchSFTP`, `GetSFTP`, `PutSFTP`

"""

config: NifiSourceConfig
report: NifiSourceReport
Expand All @@ -392,6 +400,10 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None:
if self.config.ca_file is not None:
self.session.verify = self.config.ca_file

# To keep track of process groups (containers) which have already been ingested
# Required, as we do not ingest all process groups but only those that have known ingress/egress processors
self.processed_pgs: List[str] = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used when emit_process_group_as_container is enabled right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


@cached_property
def rest_api_base_url(self):
return self.config.site_url[: -len("nifi/")] + "nifi-api/"
Expand Down Expand Up @@ -794,7 +806,7 @@ def delete_provenance(self, provenance_uri):
def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
rootpg = self.nifi_flow.root_process_group
flow_name = rootpg.name # self.config.site_name
flow_urn = builder.make_data_flow_urn(NIFI, rootpg.id, self.config.env)
flow_urn = self.make_flow_urn()
flow_properties = {}
if self.nifi_flow.clustered is not None:
flow_properties["clustered"] = str(self.nifi_flow.clustered)
Expand Down Expand Up @@ -927,9 +939,16 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
)
break

if self.config.emit_process_group_as_container:
# We emit process groups only for all nifi components qualifying as datajobs
yield from self.construct_process_group_workunits(
component.parent_group_id
)

yield from self.construct_job_workunits(
job_urn,
job_name,
component.parent_group_id,
external_url=self.make_external_url(
component.parent_group_id, component.id, component.parent_rpg_id
),
Expand All @@ -951,6 +970,11 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
external_url=self.make_external_url(port.parent_group_id, port.id),
)

def make_flow_urn(self) -> str:
return builder.make_data_flow_urn(
NIFI, self.nifi_flow.root_process_group.id, self.config.env
)

def process_provenance_events(self):
startDate = datetime.now(timezone.utc) - timedelta(
days=self.config.provenance_days
Expand Down Expand Up @@ -1083,6 +1107,7 @@ def construct_job_workunits(
self,
job_urn: str,
job_name: str,
parent_group_id: str,
external_url: str,
job_type: str,
description: Optional[str],
Expand All @@ -1107,17 +1132,114 @@ def construct_job_workunits(
),
).as_workunit()

# If dataJob had container aspect, we would ideally only emit it
# and browse path v2 would automatically be generated.
yield self.gen_browse_path_v2_workunit(job_urn, parent_group_id)

inlets.sort()
outlets.sort()
inputJobs.sort()

yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataJobInputOutputClass(
inputDatasets=inlets, outputDatasets=outlets, inputDatajobs=inputJobs
if self.config.incremental_lineage:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I would also like to patch DataJobInfo - to not overwrite last_event_time custom property that represents when the last event was processed by DataJob so that it doesn't get overwritten. However currently DataJobPatchBuilder does not support patching this aspect. I will add this in followup.

patch_builder: DataJobPatchBuilder = DataJobPatchBuilder(job_urn)
for inlet in inlets:
patch_builder.add_input_dataset(inlet)
for outlet in outlets:
patch_builder.add_output_dataset(outlet)
for inJob in inputJobs:
patch_builder.add_input_datajob(inJob)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
id=f"{job_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually I would like to move this into auto_incremental_lineage

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right!

else:
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataJobInputOutputClass(
inputDatasets=inlets,
outputDatasets=outlets,
inputDatajobs=inputJobs,
),
).as_workunit()

def gen_browse_path_v2_workunit(
self, entity_urn: str, process_group_id: str
) -> MetadataWorkUnit:
flow_urn = self.make_flow_urn()
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=BrowsePathsV2Class(
path=[
BrowsePathEntryClass(id=flow_urn, urn=flow_urn),
*self._get_browse_path_v2_entries(process_group_id),
]
),
).as_workunit()

def _get_browse_path_v2_entries(
self, process_group_id: str
) -> List[BrowsePathEntryClass]:
"""Browse path entries till current process group"""
if self._is_root_process_group(process_group_id):
return []

current_process_group = self.nifi_flow.processGroups[process_group_id]
assert (
current_process_group.parent_group_id
) # always present for non-root process group
parent_browse_path = self._get_browse_path_v2_entries(
current_process_group.parent_group_id
)

if self.config.emit_process_group_as_container:
container_urn = self.gen_process_group_key(process_group_id).as_urn()
current_browse_entry = BrowsePathEntryClass(
id=container_urn, urn=container_urn
)
else:
current_browse_entry = BrowsePathEntryClass(id=current_process_group.name)
return parent_browse_path + [current_browse_entry]

def _is_root_process_group(self, process_group_id: str) -> bool:
return self.nifi_flow.root_process_group.id == process_group_id

def construct_process_group_workunits(
self, process_group_id: str
) -> Iterable[MetadataWorkUnit]:
if (
self._is_root_process_group(process_group_id)
or process_group_id in self.processed_pgs
):
return
self.processed_pgs.append(process_group_id)

pg = self.nifi_flow.processGroups[process_group_id]
container_key = self.gen_process_group_key(process_group_id)
yield from gen_containers(
container_key=container_key,
name=pg.name,
sub_types=[JobContainerSubTypes.NIFI_PROCESS_GROUP],
parent_container_key=(
self.gen_process_group_key(pg.parent_group_id)
if pg.parent_group_id
and not self._is_root_process_group(pg.parent_group_id)
else None
),
)

if pg.parent_group_id: # always true for non-root process group
yield from self.construct_process_group_workunits(pg.parent_group_id)

if self._is_root_process_group(pg.parent_group_id):
yield self.gen_browse_path_v2_workunit(
container_key.as_urn(), pg.parent_group_id
)

def gen_process_group_key(self, process_group_id: str) -> ProcessGroupKey:
return ProcessGroupKey(
process_group_id=process_group_id, platform=NIFI, env=self.config.env
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are process_group_ids globally unique within nifi? it seems like they're scoped to the flow

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, this is safe.
For a single nifi clustered or standalone deployment - process group id would be unique. As of now - entire deployment is represented as single dataflow - for nothing else comes closer.
In future when multiple nifi deployments come in picture (seems rare, given the complexity of single nifi deployment), addition of platform-instance capability in nifi source should handle the uniqueness.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

)

def construct_dataset_workunits(
self,
dataset_platform: str,
Expand Down
Loading
Loading