Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/dynamodb): implement pagination for list_tables #8910

Merged
merged 12 commits into from
Oct 5, 2023
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from dataclasses import field
from dataclasses import dataclass, field
from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union

import boto3
Expand Down Expand Up @@ -85,6 +85,7 @@ class DynamoDBConfig(DatasetSourceConfigMixin, StatefulIngestionConfigBase):
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None


@dataclass
class DynamoDBSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)

Expand Down Expand Up @@ -175,39 +176,30 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

# traverse databases in sorted order so output is consistent
for region in dynamodb_regions:
try:
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
# TODO: list table returns maximum number 100, need to implement pagination here
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id
if self.config.aws_access_key_id
else None,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value()
if self.config.aws_secret_access_key
else None,
)
table_names: List[str] = dynamodb_client.list_tables()["TableNames"]
except Exception as ex:
# TODO: If regions is config input then this would be self.report.report_warning,
# we can create dynamodb client to take aws region or regions as user input
logger.info(f"exception happen in region {region}, skipping: {ex}")
continue
for table_name in sorted(table_names):
if not self.config.table_pattern.allowed(table_name):
logger.info(f"Processing region {region}")
# create a new dynamodb client for each region,
# it seems for one client we could only list the table of one specific region,
# the list_tables() method don't take any config that related to region
dynamodb_client = boto3.client(
"dynamodb",
region_name=region,
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(),
)

for table_name in self._list_tables(dynamodb_client):
dataset_name = f"{region}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
jinlintt marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"skipping table: {dataset_name}")
jinlintt marked this conversation as resolved.
Show resolved Hide resolved
self.report.report_dropped(dataset_name)
continue

logger.info(f"Processing table: {dataset_name}")
jinlintt marked this conversation as resolved.
Show resolved Hide resolved
table_info = dynamodb_client.describe_table(TableName=table_name)[
"Table"
]
account_id = table_info["TableArn"].split(":")[4]
if not self.config.table_pattern.allowed(table_name):
self.report.report_dropped(table_name)
continue
platform_instance = self.config.platform_instance or account_id
dataset_name = f"{region}.{table_name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
platform_instance=platform_instance,
Expand All @@ -222,7 +214,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
)
primary_key_dict = self.extract_primary_key_from_key_schema(table_info)
table_schema = self.construct_schema_from_dynamodb(
dynamodb_client, table_name
dynamodb_client, region, table_name
)
schema_metadata = self.construct_schema_metadata(
table_name,
Expand Down Expand Up @@ -254,9 +246,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
aspect=platform_instance_aspect,
).as_workunit()

def _list_tables(
self,
dynamodb_client: BaseClient,
) -> Iterable[str]:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/paginator/ListTables.html
try:
for page in dynamodb_client.get_paginator("list_tables").paginate():
table_names = page.get("TableNames")
if table_names:
yield from table_names
except Exception as ex:
# TODO: If regions is config input then this would be self.report.report_warning,
# we can create dynamodb client to take aws region or regions as user input
logger.info(f"Exception happened while listing tables, skipping: {ex}")

def construct_schema_from_dynamodb(
self,
dynamodb_client: BaseClient,
region: str,
table_name: str,
) -> Dict[str, SchemaDescription]:
"""
Expand All @@ -275,7 +283,7 @@ def construct_schema_from_dynamodb(
The MaxItems is the total number of items to return, and PageSize is the size of each page, we are assigning same value
to these two config. If MaxItems is more than PageSize then we expect MaxItems / PageSize pages in response_iterator will return
"""
self.include_table_item_to_schema(dynamodb_client, table_name, schema)
self.include_table_item_to_schema(dynamodb_client, region, table_name, schema)
response_iterator = paginator.paginate(
TableName=table_name,
PaginationConfig={
Expand All @@ -294,33 +302,38 @@ def construct_schema_from_dynamodb(
def include_table_item_to_schema(
self,
dynamodb_client: Any,
region: str,
table_name: str,
schema: Dict[str, SchemaDescription],
) -> None:
"""
It will look up in the config include_table_item dict to see if the current table name exists as key,
It will look up in the config include_table_item dict to see if "region.table_name" exists as key,
jinlintt marked this conversation as resolved.
Show resolved Hide resolved
if it exists then get the items by primary key from the table and put it to schema
"""
if self.config.include_table_item is None:
return
if table_name not in self.config.include_table_item.keys():
dataset_name = f"{region}.{table_name}"
if dataset_name not in self.config.include_table_item.keys():
return
primary_key_list = self.config.include_table_item.get(table_name)
primary_key_list = self.config.include_table_item.get(dataset_name)
assert isinstance(primary_key_list, List)
if len(primary_key_list) > MAX_PRIMARY_KEYS_SIZE:
logger.info(
f"the provided primary keys list size exceeded the max size for table {table_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items"
f"the provided primary keys list size exceeded the max size for table {dataset_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items"
)
primary_key_list = primary_key_list[0:MAX_PRIMARY_KEYS_SIZE]
items = []
response = dynamodb_client.batch_get_item(
RequestItems={table_name: {"Keys": primary_key_list}}
).get("Responses", None)
).get("Responses")
if response is None:
logger.error(
f"failed to retrieve item from table {table_name} by the given key {primary_key_list}"
)
return
logger.info(
jinlintt marked this conversation as resolved.
Show resolved Hide resolved
f"successfully retrieved {len(primary_key_list)} items based on supplied primary key list"
)
items = response.get(table_name)

self.construct_schema_from_items(items, schema)
Expand Down
Loading