Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB IAM auth #10419

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #10419 - `aws_session_token` and `aws_region` are now required configurations in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.

### Potential Downtime

### Deprecations
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
### Prerequisities

Notice of breaking change: in the latest version of the DynamoDB connector, both `aws_session_token` and `aws_region` are required configurations. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.

In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user.

For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ source:
platform_instance: "AWS_ACCOUNT_ID"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
aws_session_token: "${AWS_SESSION_TOKEN}"
aws_region: "${AWS_REGION}"
#
# If there are items that have most representative fields of the table, users could use the
# `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datahub.configuration.source_common import EnvConfigMixin

if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient
from mypy_boto3_glue import GlueClient
from mypy_boto3_s3 import S3Client, S3ServiceResource
from mypy_boto3_sagemaker import SageMakerClient
Expand Down Expand Up @@ -214,6 +215,9 @@ def get_s3_resource(
def get_glue_client(self) -> "GlueClient":
return self.get_session().client("glue", config=self._aws_config())

def get_dynamodb_client(self) -> "DynamoDBClient":
return self.get_session().client("dynamodb", config=self._aws_config())

def get_sagemaker_client(self) -> "SageMakerClient":
return self.get_session().client("sagemaker", config=self._aws_config())

Expand All @@ -224,6 +228,7 @@ class AwsSourceConfig(EnvConfigMixin, AwsConnectionConfig):

Currently used by:
- Glue source
- DynamoDB source
- SageMaker source
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
Union,
)

import boto3
import pydantic
from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -42,6 +40,7 @@
ClassificationSourceConfigMixin,
classification_workunit_processor,
)
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.dynamodb.data_reader import DynamoDBTableItemsReader
from datahub.ingestion.source.schema_inference.object import SchemaDescription
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -93,12 +92,8 @@ class DynamoDBConfig(
DatasetSourceConfigMixin,
StatefulIngestionConfigBase,
ClassificationSourceConfigMixin,
AwsSourceConfig,
):
# TODO: refactor the config to use AwsConnectionConfig and create a method get_dynamodb_client
# in the class to provide optional region name input
aws_access_key_id: str = Field(description="AWS Access Key ID.")
aws_secret_access_key: pydantic.SecretStr = Field(description="AWS Secret Key.")

domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description="regex patterns for tables to filter to assign domain_key. ",
Expand All @@ -120,6 +115,10 @@ class DynamoDBConfig(
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@property
def dynamodb_client(self):
return self.get_dynamodb_client()


@dataclass
class DynamoDBSourceReport(StaleEntityRemovalSourceReport, ClassificationReportMixin):
Expand Down Expand Up @@ -212,41 +211,27 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# This is a offline call to get available region names from botocore library
session = boto3.Session()
dynamodb_regions = session.get_available_regions("dynamodb")
logger.info(f"region names {dynamodb_regions}")

# traverse databases in sorted order so output is consistent
for region in dynamodb_regions:
eboneil marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Processing region {region}")
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(),
)
data_reader = DynamoDBTableItemsReader.create(dynamodb_client)
dynamodb_client = self.config.dynamodb_client
region = dynamodb_client.meta.region_name

for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue
data_reader = DynamoDBTableItemsReader.create(dynamodb_client)

table_wu_generator = self._process_table(
region, dynamodb_client, table_name, dataset_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[region, table_name],
)
for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue

table_wu_generator = self._process_table(
region, dynamodb_client, table_name, dataset_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[region, table_name],
)

def _process_table(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def test_dynamodb(pytestconfig, tmp_path):
"config": {
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"aws_session_token": "test",
"aws_region": "us-west-2",
},
},
"sink": {
Expand Down Expand Up @@ -97,6 +99,8 @@ def test_dynamodb(pytestconfig, tmp_path):
"platform_instance": "dynamodb_test",
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"aws_session_token": "test",
"aws_region": "us-west-2",
"classification": ClassificationConfig(
enabled=True,
classifiers=[
Expand Down
Loading