Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/glue): Add support for missing config options for profiling in Glue #10858

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ New (optional fields `systemMetadata` and `headers`):
"headers": {}
}
```
- #10858 Profiling configuration for Glue source has been updated.

Previously, the configuration was:
```yaml
profiling: {}
```

Now, it needs to be:

```yaml
profiling:
enabled: true
```

### Potential Downtime

Expand Down
71 changes: 37 additions & 34 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ class GlueSourceConfig(
default=False,
description="If an S3 Objects Tags should be created for the Tables ingested by Glue.",
)
profiling: Optional[GlueProfilingConfig] = Field(
default=None,
profiling: GlueProfilingConfig = Field(
default_factory=GlueProfilingConfig,
description="Configs to ingest data profiles from glue table",
)
# Custom Stateful Ingestion settings
Expand All @@ -186,7 +186,7 @@ class GlueSourceConfig(
)

def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

Expand Down Expand Up @@ -867,34 +867,39 @@ def _create_profile_mcp(
# instantiate column profile class for each column
column_profile = DatasetFieldProfileClass(fieldPath=column_name)

if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[self.source_config.profiling.mean]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[self.source_config.profiling.stdev]
if not self.source_config.profiling.profile_table_level_only:
if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[
self.source_config.profiling.mean
]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[
self.source_config.profiling.stdev
]
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

dataset_profile.fieldProfiles.append(column_profile)

Expand All @@ -914,9 +919,7 @@ def _create_profile_mcp(
def get_profile_if_enabled(
self, mce: MetadataChangeEventClass, database_name: str, table_name: str
) -> Iterable[MetadataWorkUnit]:
# We don't need both checks only the second one
# but then lint believes that GlueProfilingConfig can be None
if self.source_config.profiling and self.source_config.is_profiling_enabled():
if self.source_config.is_profiling_enabled():
# for cross-account ingestion
kwargs = dict(
DatabaseName=database_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@


class GlueProfilingConfig(ConfigModel):
enabled: bool = Field(
default=False,
description="Whether profiling should be done.",
)
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only, or include column-level profiling as well.",
)
row_count: Optional[str] = Field(
default=None,
description="The parameter name for row count in glue table.",
Expand Down
Loading
Loading