Skip to content

Commit

Permalink
DynamoDB IAM auth (datahub-project#10419)
Browse files Browse the repository at this point in the history
  • Loading branch information
eboneil authored and sleeperdeep committed Jun 25, 2024
1 parent a46e6c4 commit dbccd64
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 40 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #10419 - `aws_session_token` and `aws_region` are now required configurations in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.

### Potential Downtime

### Deprecations
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
### Prerequisities

Notice of breaking change: in the latest version of the DynamoDB connector, both `aws_session_token` and `aws_region` are required configurations. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.

In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user.

For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ source:
platform_instance: "AWS_ACCOUNT_ID"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
aws_session_token: "${AWS_SESSION_TOKEN}"
aws_region: "${AWS_REGION}"
#
# If there are items that have most representative fields of the table, users could use the
# `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datahub.configuration.source_common import EnvConfigMixin

if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient
from mypy_boto3_glue import GlueClient
from mypy_boto3_s3 import S3Client, S3ServiceResource
from mypy_boto3_sagemaker import SageMakerClient
Expand Down Expand Up @@ -214,6 +215,9 @@ def get_s3_resource(
def get_glue_client(self) -> "GlueClient":
return self.get_session().client("glue", config=self._aws_config())

def get_dynamodb_client(self) -> "DynamoDBClient":
return self.get_session().client("dynamodb", config=self._aws_config())

def get_sagemaker_client(self) -> "SageMakerClient":
return self.get_session().client("sagemaker", config=self._aws_config())

Expand All @@ -224,6 +228,7 @@ class AwsSourceConfig(EnvConfigMixin, AwsConnectionConfig):
Currently used by:
- Glue source
- DynamoDB source
- SageMaker source
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
Union,
)

import boto3
import pydantic
from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -42,6 +40,7 @@
ClassificationSourceConfigMixin,
classification_workunit_processor,
)
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.dynamodb.data_reader import DynamoDBTableItemsReader
from datahub.ingestion.source.schema_inference.object import SchemaDescription
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -93,12 +92,8 @@ class DynamoDBConfig(
DatasetSourceConfigMixin,
StatefulIngestionConfigBase,
ClassificationSourceConfigMixin,
AwsSourceConfig,
):
# TODO: refactor the config to use AwsConnectionConfig and create a method get_dynamodb_client
# in the class to provide optional region name input
aws_access_key_id: str = Field(description="AWS Access Key ID.")
aws_secret_access_key: pydantic.SecretStr = Field(description="AWS Secret Key.")

domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description="regex patterns for tables to filter to assign domain_key. ",
Expand All @@ -120,6 +115,10 @@ class DynamoDBConfig(
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@property
def dynamodb_client(self):
return self.get_dynamodb_client()


@dataclass
class DynamoDBSourceReport(StaleEntityRemovalSourceReport, ClassificationReportMixin):
Expand Down Expand Up @@ -212,41 +211,27 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# This is a offline call to get available region names from botocore library
session = boto3.Session()
dynamodb_regions = session.get_available_regions("dynamodb")
logger.info(f"region names {dynamodb_regions}")

# traverse databases in sorted order so output is consistent
for region in dynamodb_regions:
logger.info(f"Processing region {region}")
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(),
)
data_reader = DynamoDBTableItemsReader.create(dynamodb_client)
dynamodb_client = self.config.dynamodb_client
region = dynamodb_client.meta.region_name

for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue
data_reader = DynamoDBTableItemsReader.create(dynamodb_client)

table_wu_generator = self._process_table(
region, dynamodb_client, table_name, dataset_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[region, table_name],
)
for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue

table_wu_generator = self._process_table(
region, dynamodb_client, table_name, dataset_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[region, table_name],
)

def _process_table(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def test_dynamodb(pytestconfig, tmp_path):
"config": {
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"aws_session_token": "test",
"aws_region": "us-west-2",
},
},
"sink": {
Expand Down Expand Up @@ -97,6 +99,8 @@ def test_dynamodb(pytestconfig, tmp_path):
"platform_instance": "dynamodb_test",
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"aws_session_token": "test",
"aws_region": "us-west-2",
"classification": ClassificationConfig(
enabled=True,
classifiers=[
Expand Down

0 comments on commit dbccd64

Please sign in to comment.