From 28733b24f2f511e88aaa7ef076a139e5d97a609c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 15 Sep 2024 01:10:11 +0200 Subject: [PATCH] DynamoDB Relay: Refactor "drop stream" action into test fixture --- tests/io/dynamodb/conftest.py | 58 +++++++++++++++++++++++++++++---- tests/io/dynamodb/test_relay.py | 23 ------------- 2 files changed, 52 insertions(+), 29 deletions(-) diff --git a/tests/io/dynamodb/conftest.py b/tests/io/dynamodb/conftest.py index c21d0e86..82262a6e 100644 --- a/tests/io/dynamodb/conftest.py +++ b/tests/io/dynamodb/conftest.py @@ -1,14 +1,24 @@ import logging +import time +import typing +import botocore import pytest from yarl import URL +from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter from tests.io.dynamodb.manager import DynamoDBTestManager logger = logging.getLogger(__name__) -# Define databases to be deleted before running each test case. +# Define streams to be deleted before running each test case. +RESET_STREAMS = [ + "demo", +] + +# Define tables to be deleted before running each test case. RESET_TABLES = [ "ProductCatalog", ] @@ -17,11 +27,15 @@ class DynamoDBFixture: """ A little helper wrapping Testcontainer's `LocalStackContainer`. + + TODO: Generalize into `LocalStackFixture`. """ def __init__(self): self.container = None self.url = None + self.dynamodb_adapter: typing.Union[DynamoDBAdapter, None] = None + self.kinesis_adapter: typing.Union[KinesisAdapter, None] = None self.setup() def setup(self): @@ -32,17 +46,49 @@ def setup(self): self.container.with_services("dynamodb", "kinesis") self.container.start() + self.dynamodb_adapter = DynamoDBAdapter(URL(self.get_connection_url_dynamodb())) + self.kinesis_adapter = KinesisAdapter( + URL(f"{self.get_connection_url_kinesis_dynamodb_cdc()}/?region=us-east-1") + ) + def finalize(self): self.container.stop() def reset(self): """ - Drop all databases used for testing. + Reset all resources to provide each test case with a fresh canvas. + """ + self.reset_streams() + self.reset_tables() + + def reset_streams(self): + """ + Drop all Kinesis streams used for testing. + """ + kinesis_client = self.kinesis_adapter.kinesis_client + for stream_name in RESET_STREAMS: + try: + kinesis_client.delete_stream(StreamName=stream_name) + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + waiter = kinesis_client.get_waiter("stream_not_exists") + waiter.wait(StreamName=stream_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15}) + time.sleep(0.25) + + def reset_tables(self): + """ + Drop all DynamoDB tables used for testing. """ - # FIXME - return - for database_name in RESET_TABLES: - self.client.drop_database(database_name) + dynamodb_client = self.dynamodb_adapter.dynamodb_client + for table_name in RESET_TABLES: + try: + dynamodb_client.delete_table(TableName=table_name) + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + waiter = dynamodb_client.get_waiter("table_not_exists") + waiter.wait(TableName=table_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15}) def get_connection_url_dynamodb(self): url = URL(self.container.get_url()) diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py index 7576cf28..ac30a60a 100644 --- a/tests/io/dynamodb/test_relay.py +++ b/tests/io/dynamodb/test_relay.py @@ -1,7 +1,6 @@ import threading import time -import botocore import pytest from cratedb_toolkit.io.kinesis.relay import KinesisRelay @@ -45,17 +44,6 @@ def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Initialize table loader. table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) - # Delete stream for blank canvas. - try: - table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True) - except botocore.exceptions.ClientError as error: - if error.response["Error"]["Code"] != "ResourceNotFoundException": - raise - - # LocalStack needs a while when deleting the Stream. - # FIXME: Can this be made more efficient? - time.sleep(0.5) - # Populate source database with data. for event in events: table_loader.kinesis_adapter.produce(event) @@ -98,17 +86,6 @@ def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Initialize table loader. table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) - # Delete stream for blank canvas. - try: - table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") - except botocore.exceptions.ClientError as error: - if error.response["Error"]["Code"] != "ResourceNotFoundException": - raise - - # LocalStack needs a while when deleting the Stream. - # FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence? - time.sleep(0.5) - # Start event processor / stream consumer in separate thread, consuming forever. thread = threading.Thread(target=table_loader.start) thread.start()