Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/sagemaker): ensure consistent STS token usage with refresh mechanism #11170

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import boto3
Expand Down Expand Up @@ -73,6 +74,8 @@ class AwsConnectionConfig(ConfigModel):
- dbt source
"""

_credentials_expiration: Optional[datetime] = None

aws_access_key_id: Optional[str] = Field(
default=None,
description=f"AWS access key ID. {AUTODETECT_CREDENTIALS_DOC_LINK}",
Expand Down Expand Up @@ -115,6 +118,11 @@ class AwsConnectionConfig(ConfigModel):
description="Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
)

def allowed_cred_refresh(self) -> bool:
if self._normalized_aws_roles():
return True
return False

def _normalized_aws_roles(self) -> List[AwsAssumeRoleConfig]:
if not self.aws_role:
return []
Expand Down Expand Up @@ -153,11 +161,14 @@ def get_session(self) -> Session:
}

for role in self._normalized_aws_roles():
credentials = assume_role(
role,
self.aws_region,
credentials=credentials,
)
if self._should_refresh_credentials():
credentials = assume_role(
role,
self.aws_region,
credentials=credentials,
)
if isinstance(credentials["Expiration"], datetime):
self._credentials_expiration = credentials["Expiration"]

session = Session(
aws_access_key_id=credentials["AccessKeyId"],
Expand All @@ -168,6 +179,12 @@ def get_session(self) -> Session:

return session

def _should_refresh_credentials(self) -> bool:
if self._credentials_expiration is None:
return True
remaining_time = self._credentials_expiration - datetime.now(timezone.utc)
return remaining_time < timedelta(minutes=5)
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

def get_credentials(self) -> Dict[str, Optional[str]]:
credentials = self.get_session().get_credentials()
if credentials is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# extract jobs if specified
if self.source_config.extract_jobs is not False:
job_processor = JobProcessor(
sagemaker_client=self.sagemaker_client,
sagemaker_client=self.source_config.get_auto_refreshing_sagemaker_client()
if self.source_config.allowed_cred_refresh()
else self.sagemaker_client,
env=self.env,
report=self.report,
job_type_filter=self.source_config.extract_jobs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class SagemakerSourceConfig(
def sagemaker_client(self):
return self.get_sagemaker_client()

def get_auto_refreshing_sagemaker_client(self):
"""
Returns a reference to the SageMaker client function.
This is used to create a fresh client each time it is called.
"""
return self.get_sagemaker_client


@dataclass
class SagemakerSourceReport(StaleEntityRemovalSourceReport):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from types import MethodType
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -147,7 +148,8 @@ class JobProcessor:
"""

# boto3 SageMaker client
sagemaker_client: "SageMakerClient"
sagemaker_client: Any

env: str
report: SagemakerSourceReport
# config filter for specific job types to ingest (see metadata-ingestion README)
Expand All @@ -170,8 +172,7 @@ class JobProcessor:

def get_jobs(self, job_type: JobType, job_spec: JobInfo) -> List[Any]:
jobs = []

paginator = self.sagemaker_client.get_paginator(job_spec.list_command)
paginator = self.get_sagemaker_client().get_paginator(job_spec.list_command)
for page in paginator.paginate():
page_jobs: List[Any] = page[job_spec.list_key]

Expand Down Expand Up @@ -269,7 +270,7 @@ def get_job_details(self, job_name: str, job_type: JobType) -> Dict[str, Any]:
describe_command = job_type_to_info[job_type].describe_command
describe_name_key = job_type_to_info[job_type].describe_name_key

return getattr(self.sagemaker_client, describe_command)(
return getattr(self.get_sagemaker_client(), describe_command)(
**{describe_name_key: job_name}
)

Expand Down Expand Up @@ -940,3 +941,8 @@ def process_transform_job(self, job: Dict[str, Any]) -> SageMakerJob:
output_datasets=output_datasets,
input_jobs=input_jobs,
)

def get_sagemaker_client(self) -> "SageMakerClient":
if isinstance(self.sagemaker_client, MethodType):
return self.sagemaker_client()
return self.sagemaker_client
Loading