Skip to content

Commit

Permalink
feat(ingestion/dynamodb): implement pagination for list_tables (#8910)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinlintt authored Oct 5, 2023
1 parent 2fcced6 commit 6310e51
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 59 deletions.
4 changes: 2 additions & 2 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
"name": "dynamodb",
"displayName": "DynamoDB",
"docsUrl": "https://datahubproject.io/docs/metadata-ingestion/",
"recipe": "source:\n type: dynamodb\n config:\n platform_instance: \"AWS_ACCOUNT_ID\"\n aws_access_key_id : '${AWS_ACCESS_KEY_ID}'\n aws_secret_access_key : '${AWS_SECRET_ACCESS_KEY}'\n # User could use the below option to provide a list of primary keys of a table in dynamodb format,\n # those items from given primary keys will be included when we scan the table.\n # For each table we can retrieve up to 16 MB of data, which can contain as many as 100 items.\n # We'll enforce the the primary keys list size not to exceed 100\n # The total items we'll try to retrieve in these two scenarios:\n # 1. If user don't specify include_table_item: we'll retrieve up to 100 items\n # 2. If user specifies include_table_item: we'll retrieve up to 100 items plus user specified items in\n # the table, with a total not more than 200 items\n # include_table_item:\n # table_name:\n # [\n # {\n # 'partition_key_name': { 'attribute_type': 'attribute_value' },\n # 'sort_key_name': { 'attribute_type': 'attribute_value' },\n # },\n # ]"
"recipe": "source:\n type: dynamodb\n config:\n platform_instance: \"AWS_ACCOUNT_ID\"\n aws_access_key_id : '${AWS_ACCESS_KEY_ID}'\n aws_secret_access_key : '${AWS_SECRET_ACCESS_KEY}'\n # If there are items that have most representative fields of the table, users could use the\n # `include_table_item` option to provide a list of primary keys of the table in dynamodb format.\n # For each `region.table`, the list of primary keys can be at most 100.\n # We include these items in addition to the first 100 items in the table when we scan it.\n # include_table_item:\n # region.table_name:\n # [\n # {\n # 'partition_key_name': { 'attribute_type': 'attribute_value' },\n # 'sort_key_name': { 'attribute_type': 'attribute_value' },\n # },\n # ]"
},
{
"urn": "urn:li:dataPlatform:glue",
Expand Down Expand Up @@ -223,4 +223,4 @@
"docsUrl": "https://datahubproject.io/docs/metadata-ingestion/",
"recipe": "source:\n type: <source-type>\n config:\n # Source-type specifics config\n <source-configs>"
}
]
]
13 changes: 5 additions & 8 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
## Limitations

For each region, the list table operation returns maximum number 100 tables, we need to further improve it by implementing pagination for listing tables

## Advanced Configurations

### Using `include_table_item` config

If there are items that have most representative fields of the table, user could use the `include_table_item` option to provide a list of primary keys of a table in dynamodb format, those items from given primary keys will be included when we scan the table.
If there are items that have most representative fields of the table, users could use the `include_table_item` option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it.

Take [AWS DynamoDB Developer Guide Example tables and data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html) as an example, if user has a table `Reply` with composite primary key `Id` and `ReplyDateTime`, user can use `include_table_item` to include 2 items as following:
Take [AWS DynamoDB Developer Guide Example tables and data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html) as an example, if a account has a table `Reply` in the `us-west-2` region with composite primary key `Id` and `ReplyDateTime`, users can use `include_table_item` to include 2 items as following:

Example:

```yml
# put the table name and composite key in DynamoDB format
# The table name should be in the format of region.table_name
# The primary keys should be in the DynamoDB format
include_table_item:
Reply:
us-west-2.Reply:
[
{
"ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" },
Expand Down
6 changes: 2 additions & 4 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
### Prerequisities

In order to execute this source, you will need to create access key and secret keys that have DynamoDB read access. You can create these policies and attach to your account or can ask your account admin to attach these policies to your account.
In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user.

For access key permissions, you can create a policy with permissions below and attach to your account, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)
For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)

```json
{
Expand All @@ -22,5 +22,3 @@ For access key permissions, you can create a policy with permissions below and a
]
}
```

For DynamoDB read access, you can simply attach AWS managed policy `AmazonDynamoDBReadOnlyAccess` to your account, you can find more details in [Attaching a policy to an IAM user group](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_groups_manage_attach-policy.html)
16 changes: 7 additions & 9 deletions metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ source:
platform_instance: "AWS_ACCOUNT_ID"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
# User could use the below option to provide a list of primary keys of a table in dynamodb format,
# those items from given primary keys will be included when we scan the table.
# For each table we can retrieve up to 16 MB of data, which can contain as many as 100 items.
# We'll enforce the the primary keys list size not to exceed 100
# The total items we'll try to retrieve in these two scenarios:
# 1. If user don't specify include_table_item: we'll retrieve up to 100 items
# 2. If user specifies include_table_item: we'll retrieve up to 100 items plus user specified items in
# the table, with a total not more than 200 items
#
# If there are items that have most representative fields of the table, users could use the
# `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
# For each `region.table`, the list of primary keys can be at most 100.
# We include these items in addition to the first 100 items in the table when we scan it.
#
# include_table_item:
# table_name:
# region.table_name:
# [
# {
# "partition_key_name": { "attribute_type": "attribute_value" },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from dataclasses import field
from dataclasses import dataclass, field
from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union

import boto3
Expand Down Expand Up @@ -79,12 +79,13 @@ class DynamoDBConfig(DatasetSourceConfigMixin, StatefulIngestionConfigBase):

table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for tables to filter in ingestion.",
description="Regex patterns for tables to filter in ingestion. The table name format is 'region.table'",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None


@dataclass
class DynamoDBSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)

Expand Down Expand Up @@ -175,39 +176,30 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

# traverse databases in sorted order so output is consistent
for region in dynamodb_regions:
try:
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
# TODO: list table returns maximum number 100, need to implement pagination here
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id
if self.config.aws_access_key_id
else None,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value()
if self.config.aws_secret_access_key
else None,
)
table_names: List[str] = dynamodb_client.list_tables()["TableNames"]
except Exception as ex:
# TODO: If regions is config input then this would be self.report.report_warning,
# we can create dynamodb client to take aws region or regions as user input
logger.info(f"exception happen in region {region}, skipping: {ex}")
continue
for table_name in sorted(table_names):
if not self.config.table_pattern.allowed(table_name):
logger.info(f"Processing region {region}")
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(),
)

for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(f"skipping table: {dataset_name}")
self.report.report_dropped(dataset_name)
continue

logger.debug(f"Processing table: {dataset_name}")
table_info = dynamodb_client.describe_table(TableName=table_name)[
"Table"
]
account_id = table_info["TableArn"].split(":")[4]
if not self.config.table_pattern.allowed(table_name):
self.report.report_dropped(table_name)
continue
platform_instance = self.config.platform_instance or account_id
dataset_name = f"{region}.{table_name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
platform_instance=platform_instance,
Expand All @@ -222,7 +214,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
)
primary_key_dict = self.extract_primary_key_from_key_schema(table_info)
table_schema = self.construct_schema_from_dynamodb(
dynamodb_client, table_name
dynamodb_client, region, table_name
)
schema_metadata = self.construct_schema_metadata(
table_name,
Expand Down Expand Up @@ -254,9 +246,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
aspect=platform_instance_aspect,
).as_workunit()

def _list_tables(
self,
dynamodb_client: BaseClient,
) -> Iterable[str]:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/paginator/ListTables.html
try:
for page in dynamodb_client.get_paginator("list_tables").paginate():
table_names = page.get("TableNames")
if table_names:
yield from table_names
except Exception as ex:
# TODO: If regions is config input then this would be self.report.report_warning,
# we can create dynamodb client to take aws region or regions as user input
logger.info(f"Exception happened while listing tables, skipping: {ex}")

def construct_schema_from_dynamodb(
self,
dynamodb_client: BaseClient,
region: str,
table_name: str,
) -> Dict[str, SchemaDescription]:
"""
Expand All @@ -275,7 +283,7 @@ def construct_schema_from_dynamodb(
The MaxItems is the total number of items to return, and PageSize is the size of each page, we are assigning same value
to these two config. If MaxItems is more than PageSize then we expect MaxItems / PageSize pages in response_iterator will return
"""
self.include_table_item_to_schema(dynamodb_client, table_name, schema)
self.include_table_item_to_schema(dynamodb_client, region, table_name, schema)
response_iterator = paginator.paginate(
TableName=table_name,
PaginationConfig={
Expand All @@ -294,33 +302,38 @@ def construct_schema_from_dynamodb(
def include_table_item_to_schema(
self,
dynamodb_client: Any,
region: str,
table_name: str,
schema: Dict[str, SchemaDescription],
) -> None:
"""
It will look up in the config include_table_item dict to see if the current table name exists as key,
It will look up in the config include_table_item dict to see if "region.table_name" exists as key,
if it exists then get the items by primary key from the table and put it to schema
"""
if self.config.include_table_item is None:
return
if table_name not in self.config.include_table_item.keys():
dataset_name = f"{region}.{table_name}"
if dataset_name not in self.config.include_table_item.keys():
return
primary_key_list = self.config.include_table_item.get(table_name)
primary_key_list = self.config.include_table_item.get(dataset_name)
assert isinstance(primary_key_list, List)
if len(primary_key_list) > MAX_PRIMARY_KEYS_SIZE:
logger.info(
f"the provided primary keys list size exceeded the max size for table {table_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items"
f"the provided primary keys list size exceeded the max size for table {dataset_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items"
)
primary_key_list = primary_key_list[0:MAX_PRIMARY_KEYS_SIZE]
items = []
response = dynamodb_client.batch_get_item(
RequestItems={table_name: {"Keys": primary_key_list}}
).get("Responses", None)
).get("Responses")
if response is None:
logger.error(
f"failed to retrieve item from table {table_name} by the given key {primary_key_list}"
)
return
logger.debug(
f"successfully retrieved {len(primary_key_list)} items based on supplied primary key list"
)
items = response.get(table_name)

self.construct_schema_from_items(items, schema)
Expand Down

0 comments on commit 6310e51

Please sign in to comment.