Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datahub metadata-ingestion support for preset #10954

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
119 changes: 119 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.configuration import ConfigModel

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)

from datahub.utilities import config_clean\

from datahub.ingestion.source.superset import SupersetSource

logger = logging.getLogger(__name__)
class PresetConfig(StatefulIngestionConfigBase, ConfigModel):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default=None, description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.login()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if super() already calls login, we don't need to explicitly call it again, right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're actually looking to overwrite the login in superset since the authentication method is different (Superset is username/password, Preset is API token based name/secret and also has a different URL)


def login(self):
logger.info(f"self.config is {self.config}")

login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling for the login process.

The login method should handle potential errors during the authentication process more robustly.

-        login_response = requests.post(
+        try:
+            login_response = requests.post(
-        self.access_token = login_response.json()["payload"]["access_token"]
+            login_response.raise_for_status()
+            self.access_token = login_response.json()["payload"]["access_token"]
+        except requests.exceptions.RequestException as e:
+            logger.error(f"Failed to authenticate with Preset: {e}")
+            raise

Committable suggestion was skipped due to low confidence.

8 changes: 7 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import (
EnvConfigMixin,
Expand All @@ -19,7 +20,10 @@
make_dataset_urn,
make_domain_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.emitter.mcp_builder import (
add_domain_to_entity_wu
)
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -179,7 +183,9 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.login()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be cleaner to make this self.session = self.login()

also the domain_registry logic should not be in the login method


def login(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Address the TODO comment.

There is a TODO comment regarding how to message about connection errors. Ensure that appropriate error messaging is implemented.

Do you want me to help implement the error messaging or open a GitHub issue to track this task?

login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
json={
Expand Down
Loading