Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB IAM auth #10419

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ source:
platform_instance: "AWS_ACCOUNT_ID"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
aws_session_token: "${AWS_SESSION_TOKEN}"
aws_region: "${AWS_REGION}"
#
# If there are items that have most representative fields of the table, users could use the
# `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datahub.configuration.source_common import EnvConfigMixin

if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient
from mypy_boto3_glue import GlueClient
from mypy_boto3_s3 import S3Client, S3ServiceResource
from mypy_boto3_sagemaker import SageMakerClient
Expand Down Expand Up @@ -214,6 +215,9 @@ def get_s3_resource(
def get_glue_client(self) -> "GlueClient":
return self.get_session().client("glue", config=self._aws_config())

def get_dynamodb_client(self) -> "DynamoDBClient":
return self.get_session().client("dynamodb", config=self._aws_config())

def get_sagemaker_client(self) -> "SageMakerClient":
return self.get_session().client("sagemaker", config=self._aws_config())

Expand All @@ -224,6 +228,7 @@ class AwsSourceConfig(EnvConfigMixin, AwsConnectionConfig):

Currently used by:
- Glue source
- DynamoDB source
- SageMaker source
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
Union,
)

import boto3
import pydantic
from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -42,6 +40,7 @@
ClassificationSourceConfigMixin,
classification_workunit_processor,
)
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.dynamodb.data_reader import DynamoDBTableItemsReader
from datahub.ingestion.source.schema_inference.object import SchemaDescription
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -93,12 +92,8 @@ class DynamoDBConfig(
DatasetSourceConfigMixin,
StatefulIngestionConfigBase,
ClassificationSourceConfigMixin,
AwsSourceConfig,
):
# TODO: refactor the config to use AwsConnectionConfig and create a method get_dynamodb_client
# in the class to provide optional region name input
aws_access_key_id: str = Field(description="AWS Access Key ID.")
aws_secret_access_key: pydantic.SecretStr = Field(description="AWS Secret Key.")

domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description="regex patterns for tables to filter to assign domain_key. ",
Expand All @@ -120,6 +115,10 @@ class DynamoDBConfig(
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@property
def dynamodb_client(self):
return self.get_dynamodb_client()


@dataclass
class DynamoDBSourceReport(StaleEntityRemovalSourceReport, ClassificationReportMixin):
Expand Down Expand Up @@ -212,41 +211,27 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# This is a offline call to get available region names from botocore library
session = boto3.Session()
dynamodb_regions = session.get_available_regions("dynamodb")
logger.info(f"region names {dynamodb_regions}")

# traverse databases in sorted order so output is consistent
for region in dynamodb_regions:
eboneil marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Processing region {region}")
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(),
)
data_reader = DynamoDBTableItemsReader.create(dynamodb_client)
dynamodb_client = self.config.dynamodb_client
region = dynamodb_client.meta.region_name

for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue
data_reader = DynamoDBTableItemsReader.create(dynamodb_client)

table_wu_generator = self._process_table(
region, dynamodb_client, table_name, dataset_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[region, table_name],
)
for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue

table_wu_generator = self._process_table(
region, dynamodb_client, table_name, dataset_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[region, table_name],
)

def _process_table(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def test_dynamodb(pytestconfig, tmp_path):
"config": {
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"aws_session_token": "test",
"aws_region": "us-west-2",
},
},
"sink": {
Expand Down Expand Up @@ -97,6 +99,8 @@ def test_dynamodb(pytestconfig, tmp_path):
"platform_instance": "dynamodb_test",
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"aws_session_token": "test",
"aws_region": "us-west-2",
"classification": ClassificationConfig(
enabled=True,
classifiers=[
Expand Down
Loading