diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 1bd5b6f1f768b..b18384909c33f 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -130,7 +130,7 @@ "name": "dynamodb", "displayName": "DynamoDB", "docsUrl": "https://datahubproject.io/docs/metadata-ingestion/", - "recipe": "source:\n type: dynamodb\n config:\n platform_instance: \"AWS_ACCOUNT_ID\"\n aws_access_key_id : '${AWS_ACCESS_KEY_ID}'\n aws_secret_access_key : '${AWS_SECRET_ACCESS_KEY}'\n # User could use the below option to provide a list of primary keys of a table in dynamodb format,\n # those items from given primary keys will be included when we scan the table.\n # For each table we can retrieve up to 16 MB of data, which can contain as many as 100 items.\n # We'll enforce the the primary keys list size not to exceed 100\n # The total items we'll try to retrieve in these two scenarios:\n # 1. If user don't specify include_table_item: we'll retrieve up to 100 items\n # 2. If user specifies include_table_item: we'll retrieve up to 100 items plus user specified items in\n # the table, with a total not more than 200 items\n # include_table_item:\n # table_name:\n # [\n # {\n # 'partition_key_name': { 'attribute_type': 'attribute_value' },\n # 'sort_key_name': { 'attribute_type': 'attribute_value' },\n # },\n # ]" + "recipe": "source:\n type: dynamodb\n config:\n platform_instance: \"AWS_ACCOUNT_ID\"\n aws_access_key_id : '${AWS_ACCESS_KEY_ID}'\n aws_secret_access_key : '${AWS_SECRET_ACCESS_KEY}'\n # If there are items that have most representative fields of the table, users could use the\n # `include_table_item` option to provide a list of primary keys of the table in dynamodb format.\n # For each `region.table`, the list of primary keys can be at most 100.\n # We include these items in addition to the first 100 items in the table when we scan it.\n # include_table_item:\n # region.table_name:\n # [\n # {\n # 'partition_key_name': { 'attribute_type': 'attribute_value' },\n # 'sort_key_name': { 'attribute_type': 'attribute_value' },\n # },\n # ]" }, { "urn": "urn:li:dataPlatform:glue", @@ -223,4 +223,4 @@ "docsUrl": "https://datahubproject.io/docs/metadata-ingestion/", "recipe": "source:\n type: \n config:\n # Source-type specifics config\n " } -] \ No newline at end of file +] diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md b/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md index 7f9a0324c7bc6..a1c0a6e2d4d21 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md @@ -1,21 +1,18 @@ -## Limitations - -For each region, the list table operation returns maximum number 100 tables, we need to further improve it by implementing pagination for listing tables - ## Advanced Configurations ### Using `include_table_item` config -If there are items that have most representative fields of the table, user could use the `include_table_item` option to provide a list of primary keys of a table in dynamodb format, those items from given primary keys will be included when we scan the table. +If there are items that have most representative fields of the table, users could use the `include_table_item` option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it. -Take [AWS DynamoDB Developer Guide Example tables and data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html) as an example, if user has a table `Reply` with composite primary key `Id` and `ReplyDateTime`, user can use `include_table_item` to include 2 items as following: +Take [AWS DynamoDB Developer Guide Example tables and data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html) as an example, if a account has a table `Reply` in the `us-west-2` region with composite primary key `Id` and `ReplyDateTime`, users can use `include_table_item` to include 2 items as following: Example: ```yml -# put the table name and composite key in DynamoDB format +# The table name should be in the format of region.table_name +# The primary keys should be in the DynamoDB format include_table_item: - Reply: + us-west-2.Reply: [ { "ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" }, diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md b/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md index a48e8d5be04aa..598d0ecdb3786 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md @@ -1,8 +1,8 @@ ### Prerequisities -In order to execute this source, you will need to create access key and secret keys that have DynamoDB read access. You can create these policies and attach to your account or can ask your account admin to attach these policies to your account. +In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user. -For access key permissions, you can create a policy with permissions below and attach to your account, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) +For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) ```json { @@ -22,5 +22,3 @@ For access key permissions, you can create a policy with permissions below and a ] } ``` - -For DynamoDB read access, you can simply attach AWS managed policy `AmazonDynamoDBReadOnlyAccess` to your account, you can find more details in [Attaching a policy to an IAM user group](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_groups_manage_attach-policy.html) diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml b/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml index bd41637907b5c..4f4edc9a7d496 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml @@ -4,16 +4,14 @@ source: platform_instance: "AWS_ACCOUNT_ID" aws_access_key_id: "${AWS_ACCESS_KEY_ID}" aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}" - # User could use the below option to provide a list of primary keys of a table in dynamodb format, - # those items from given primary keys will be included when we scan the table. - # For each table we can retrieve up to 16 MB of data, which can contain as many as 100 items. - # We'll enforce the the primary keys list size not to exceed 100 - # The total items we'll try to retrieve in these two scenarios: - # 1. If user don't specify include_table_item: we'll retrieve up to 100 items - # 2. If user specifies include_table_item: we'll retrieve up to 100 items plus user specified items in - # the table, with a total not more than 200 items + # + # If there are items that have most representative fields of the table, users could use the + # `include_table_item` option to provide a list of primary keys of the table in dynamodb format. + # For each `region.table`, the list of primary keys can be at most 100. + # We include these items in addition to the first 100 items in the table when we scan it. + # # include_table_item: - # table_name: + # region.table_name: # [ # { # "partition_key_name": { "attribute_type": "attribute_value" }, diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py index 6b7c118373673..d7f3dfb9279fb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py @@ -1,5 +1,5 @@ import logging -from dataclasses import field +from dataclasses import dataclass, field from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union import boto3 @@ -79,12 +79,13 @@ class DynamoDBConfig(DatasetSourceConfigMixin, StatefulIngestionConfigBase): table_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="regex patterns for tables to filter in ingestion.", + description="Regex patterns for tables to filter in ingestion. The table name format is 'region.table'", ) # Custom Stateful Ingestion settings stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None +@dataclass class DynamoDBSourceReport(StaleEntityRemovalSourceReport): filtered: List[str] = field(default_factory=list) @@ -175,39 +176,30 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # traverse databases in sorted order so output is consistent for region in dynamodb_regions: - try: - # create a new dynamodb client for each region, - # it seems for one client we could only list the table of one specific region, - # the list_tables() method don't take any config that related to region - # TODO: list table returns maximum number 100, need to implement pagination here - dynamodb_client = boto3.client( - "dynamodb", - region_name=region, - aws_access_key_id=self.config.aws_access_key_id - if self.config.aws_access_key_id - else None, - aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value() - if self.config.aws_secret_access_key - else None, - ) - table_names: List[str] = dynamodb_client.list_tables()["TableNames"] - except Exception as ex: - # TODO: If regions is config input then this would be self.report.report_warning, - # we can create dynamodb client to take aws region or regions as user input - logger.info(f"exception happen in region {region}, skipping: {ex}") - continue - for table_name in sorted(table_names): - if not self.config.table_pattern.allowed(table_name): + logger.info(f"Processing region {region}") + # create a new dynamodb client for each region, + # it seems for one client we could only list the table of one specific region, + # the list_tables() method don't take any config that related to region + dynamodb_client = boto3.client( + "dynamodb", + region_name=region, + aws_access_key_id=self.config.aws_access_key_id, + aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(), + ) + + for table_name in self._list_tables(dynamodb_client): + dataset_name = f"{region}.{table_name}" + if not self.config.table_pattern.allowed(dataset_name): + logger.debug(f"skipping table: {dataset_name}") + self.report.report_dropped(dataset_name) continue + + logger.debug(f"Processing table: {dataset_name}") table_info = dynamodb_client.describe_table(TableName=table_name)[ "Table" ] account_id = table_info["TableArn"].split(":")[4] - if not self.config.table_pattern.allowed(table_name): - self.report.report_dropped(table_name) - continue platform_instance = self.config.platform_instance or account_id - dataset_name = f"{region}.{table_name}" dataset_urn = make_dataset_urn_with_platform_instance( platform=self.platform, platform_instance=platform_instance, @@ -222,7 +214,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) primary_key_dict = self.extract_primary_key_from_key_schema(table_info) table_schema = self.construct_schema_from_dynamodb( - dynamodb_client, table_name + dynamodb_client, region, table_name ) schema_metadata = self.construct_schema_metadata( table_name, @@ -254,9 +246,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: aspect=platform_instance_aspect, ).as_workunit() + def _list_tables( + self, + dynamodb_client: BaseClient, + ) -> Iterable[str]: + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/paginator/ListTables.html + try: + for page in dynamodb_client.get_paginator("list_tables").paginate(): + table_names = page.get("TableNames") + if table_names: + yield from table_names + except Exception as ex: + # TODO: If regions is config input then this would be self.report.report_warning, + # we can create dynamodb client to take aws region or regions as user input + logger.info(f"Exception happened while listing tables, skipping: {ex}") + def construct_schema_from_dynamodb( self, dynamodb_client: BaseClient, + region: str, table_name: str, ) -> Dict[str, SchemaDescription]: """ @@ -275,7 +283,7 @@ def construct_schema_from_dynamodb( The MaxItems is the total number of items to return, and PageSize is the size of each page, we are assigning same value to these two config. If MaxItems is more than PageSize then we expect MaxItems / PageSize pages in response_iterator will return """ - self.include_table_item_to_schema(dynamodb_client, table_name, schema) + self.include_table_item_to_schema(dynamodb_client, region, table_name, schema) response_iterator = paginator.paginate( TableName=table_name, PaginationConfig={ @@ -294,33 +302,38 @@ def construct_schema_from_dynamodb( def include_table_item_to_schema( self, dynamodb_client: Any, + region: str, table_name: str, schema: Dict[str, SchemaDescription], ) -> None: """ - It will look up in the config include_table_item dict to see if the current table name exists as key, + It will look up in the config include_table_item dict to see if "region.table_name" exists as key, if it exists then get the items by primary key from the table and put it to schema """ if self.config.include_table_item is None: return - if table_name not in self.config.include_table_item.keys(): + dataset_name = f"{region}.{table_name}" + if dataset_name not in self.config.include_table_item.keys(): return - primary_key_list = self.config.include_table_item.get(table_name) + primary_key_list = self.config.include_table_item.get(dataset_name) assert isinstance(primary_key_list, List) if len(primary_key_list) > MAX_PRIMARY_KEYS_SIZE: logger.info( - f"the provided primary keys list size exceeded the max size for table {table_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items" + f"the provided primary keys list size exceeded the max size for table {dataset_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items" ) primary_key_list = primary_key_list[0:MAX_PRIMARY_KEYS_SIZE] items = [] response = dynamodb_client.batch_get_item( RequestItems={table_name: {"Keys": primary_key_list}} - ).get("Responses", None) + ).get("Responses") if response is None: logger.error( f"failed to retrieve item from table {table_name} by the given key {primary_key_list}" ) return + logger.debug( + f"successfully retrieved {len(primary_key_list)} items based on supplied primary key list" + ) items = response.get(table_name) self.construct_schema_from_items(items, schema)