Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB Full: Improve error handling wrt. bulk operations vs. usability #263

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- DynamoDB CDC: Add `ctk load table` interface for processing CDC events
- DynamoDB CDC: Accept a few more options for the Kinesis Stream:
batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time
- DynamoDB Full: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
35 changes: 0 additions & 35 deletions cratedb_toolkit/io/dynamodb/backlog.md

This file was deleted.

64 changes: 39 additions & 25 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# ruff: noqa: S608
import logging
import typing as t

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator
from tqdm import tqdm
from yarl import URL

from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

Expand All @@ -23,9 +26,12 @@ def __init__(
self,
dynamodb_url: str,
cratedb_url: str,
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname
Expand All @@ -37,6 +43,7 @@ def __init__(
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.on_error = on_error
self.progress = progress
self.debug = debug

Expand All @@ -49,36 +56,43 @@ def start(self):
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception

with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out = 0
for result in self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
batch_size=self.batch_size,
):
result_size = len(result["Items"])
try:
operation = self.translator.to_sql(result["Items"])
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

processor = BulkProcessor(
connection=connection,
data=self.fetch(),
batch_to_operation=self.translator.to_sql,
progress_bar=progress_bar,
on_error=self.on_error,
debug=self.debug,
)
metrics = processor.start()
logger.info(f"Bulk processor metrics: {metrics}")

logger.info(
"Number of records written: "
f"success={metrics.count_success_total}, error={metrics.count_error_total}"
)
if metrics.count_success_total == 0:
logger.warning("No data has been copied")

return True

def fetch(self) -> t.Generator[t.List[t.Dict[str, t.Any]], None, None]:
"""
Fetch data from DynamoDB. Generate batches of items.
"""
data = self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
batch_size=self.batch_size,
)
for result in data:
yield result["Items"]
6 changes: 4 additions & 2 deletions cratedb_toolkit/io/kinesis/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@
self.kinesis_adapter.consume_forever(self.process_event)

def stop(self):
self.progress_bar.close()
self.kinesis_adapter.stop()
if hasattr(self, "progress_bar"):
self.progress_bar.close()
if hasattr(self, "kinesis_adapter"):
self.kinesis_adapter.stop()

Check warning on line 69 in cratedb_toolkit/io/kinesis/relay.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/relay.py#L66-L69

Added lines #L66 - L69 were not covered by tests

def process_event(self, event):
try:
Expand Down
58 changes: 52 additions & 6 deletions tests/io/dynamodb/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import logging
import time
import typing

import botocore
import pytest
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter
from tests.io.dynamodb.manager import DynamoDBTestManager

logger = logging.getLogger(__name__)


# Define databases to be deleted before running each test case.
# Define streams to be deleted before running each test case.
RESET_STREAMS = [
"demo",
]

# Define tables to be deleted before running each test case.
RESET_TABLES = [
"ProductCatalog",
]
Expand All @@ -17,11 +27,15 @@
class DynamoDBFixture:
"""
A little helper wrapping Testcontainer's `LocalStackContainer`.

TODO: Generalize into `LocalStackFixture`.
"""

def __init__(self):
self.container = None
self.url = None
self.dynamodb_adapter: typing.Union[DynamoDBAdapter, None] = None
self.kinesis_adapter: typing.Union[KinesisAdapter, None] = None
self.setup()

def setup(self):
Expand All @@ -32,17 +46,49 @@ def setup(self):
self.container.with_services("dynamodb", "kinesis")
self.container.start()

self.dynamodb_adapter = DynamoDBAdapter(URL(f"{self.get_connection_url_dynamodb()}/?region=us-east-1"))
self.kinesis_adapter = KinesisAdapter(
URL(f"{self.get_connection_url_kinesis_dynamodb_cdc()}/?region=us-east-1")
)

def finalize(self):
self.container.stop()

def reset(self):
"""
Drop all databases used for testing.
Reset all resources to provide each test case with a fresh canvas.
"""
self.reset_streams()
self.reset_tables()

def reset_streams(self):
"""
Drop all Kinesis streams used for testing.
"""
kinesis_client = self.kinesis_adapter.kinesis_client
for stream_name in RESET_STREAMS:
try:
kinesis_client.delete_stream(StreamName=stream_name)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise
waiter = kinesis_client.get_waiter("stream_not_exists")
waiter.wait(StreamName=stream_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15})
time.sleep(0.25)

def reset_tables(self):
"""
Drop all DynamoDB tables used for testing.
"""
# FIXME
return
for database_name in RESET_TABLES:
self.client.drop_database(database_name)
dynamodb_client = self.dynamodb_adapter.dynamodb_client
for table_name in RESET_TABLES:
try:
dynamodb_client.delete_table(TableName=table_name)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise
waiter = dynamodb_client.get_waiter("table_not_exists")
waiter.wait(TableName=table_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15})

def get_connection_url_dynamodb(self):
url = URL(self.container.get_url())
Expand Down
57 changes: 48 additions & 9 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
pytestmark = pytest.mark.dynamodb


RECORD = {
"Id": {"N": "101"},
}


def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
Verify `DynamoDBFullLoad` works as expected.
Verify a basic `DynamoDBFullLoad` works as expected.
"""

data_in = {
"Id": {"N": "101"},
}
data_out = {
"Id": 101.0,
}

# Define source and target URLs.
dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD])
dynamodb_test_manager.load_records(table_name="demo", records=[data_in])

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
Expand All @@ -32,4 +34,41 @@ def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager)
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == {"Id": 101.0}
assert results[0]["data"] == data_out


def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
Verify a basic `DynamoDBFullLoad` works as expected, this time omitting a warning on an invalid record.
"""

data_in = [
{"Id": {"N": "1"}, "name": {"S": "Foo"}},
{"Id": {"N": "2"}, "name": {"S": "Bar"}, "nested_array": {"L": [{"L": [{"N": "1"}, {"N": "2"}]}]}},
{"Id": {"N": "3"}, "name": {"S": "Baz"}},
]
data_out = [
{"data": {"Id": 1, "name": "Foo"}, "aux": {}},
{"data": {"Id": 3, "name": "Baz"}, "aux": {}},
]

# Define source and target URLs.
dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=data_in)

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
table_loader.start()

# Verify data in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 2

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY data['Id'];", records=True) # noqa: S608
assert results == data_out

assert "Dynamic nested arrays are not supported" in caplog.text
23 changes: 0 additions & 23 deletions tests/io/dynamodb/test_relay.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import threading
import time

import botocore
import pytest

from cratedb_toolkit.io.kinesis.relay import KinesisRelay
Expand Down Expand Up @@ -45,17 +44,6 @@ def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
# Initialize table loader.
table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url)

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise

# LocalStack needs a while when deleting the Stream.
# FIXME: Can this be made more efficient?
time.sleep(0.5)

# Populate source database with data.
for event in events:
table_loader.kinesis_adapter.produce(event)
Expand Down Expand Up @@ -98,17 +86,6 @@ def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
# Initialize table loader.
table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url)

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo")
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise

# LocalStack needs a while when deleting the Stream.
# FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence?
time.sleep(0.5)

# Start event processor / stream consumer in separate thread, consuming forever.
thread = threading.Thread(target=table_loader.start)
thread.start()
Expand Down