From e18504f906f47a353247db9133f5ad23dbee91a3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 30 Sep 2024 18:03:06 +0200 Subject: [PATCH] DynamoDB: Change CrateDB data model to use (`pk`, `data`, `aux`) columns By breaking the primary key information out of the main record's data bucket, the main record can be updated as-is on CDC MODIFY operations. --- CHANGES.md | 2 ++ cratedb_toolkit/io/dynamodb/adapter.py | 16 ++++++++++++++++ cratedb_toolkit/io/dynamodb/copy.py | 11 +++++++---- tests/io/dynamodb/test_copy.py | 19 +++++++++++++------ tests/io/dynamodb/test_relay.py | 7 +++++-- tests/io/test_processor.py | 4 +++- 6 files changed, 46 insertions(+), 13 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index bfcaea5..934a0bd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,8 @@ ## Unreleased - MongoDB: Updated to pymongo 4.9 +- DynamoDB: Change CrateDB data model to use (`pk`, `data`, `aux`) columns + Attention: This is a breaking change. ## 2024/09/26 v0.0.26 - MongoDB: Configure `MongoDBCrateDBConverter` after updating to commons-codec 0.0.18 diff --git a/cratedb_toolkit/io/dynamodb/adapter.py b/cratedb_toolkit/io/dynamodb/adapter.py index 6d88a5e..f7f6453 100644 --- a/cratedb_toolkit/io/dynamodb/adapter.py +++ b/cratedb_toolkit/io/dynamodb/adapter.py @@ -2,6 +2,7 @@ import typing as t import boto3 +from commons_codec.transform.dynamodb_model import PrimaryKeySchema from yarl import URL logger = logging.getLogger(__name__) @@ -55,3 +56,18 @@ def scan( def count_records(self, table_name: str): table = self.dynamodb_resource.Table(table_name) return table.item_count + + def primary_key_schema(self, table_name: str) -> PrimaryKeySchema: + """ + Return primary key information for given table, derived from `KeySchema` [1] and `AttributeDefinition` [2]. + + AttributeType: + - S - the attribute is of type String + - N - the attribute is of type Number + - B - the attribute is of type Binary + + [1] https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html#DDB-CreateTable-request-KeySchema + [2] https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeDefinition.html + """ + table = self.dynamodb_resource.Table(table_name) + return PrimaryKeySchema.from_table(table) diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index 4f5a59d..184ae9e 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -41,7 +41,6 @@ def __init__( self.dynamodb_table = self.dynamodb_url.path.lstrip("/") self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table) self.on_error = on_error self.progress = progress @@ -54,12 +53,16 @@ def start(self): """ Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ - records_in = self.dynamodb_adapter.count_records(self.dynamodb_table) + records_in = self.dynamodb_adapter.count_records(table_name=self.dynamodb_table) logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}") + # FIXME! + primary_key_schema = self.dynamodb_adapter.primary_key_schema(table_name=self.dynamodb_table) + translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table, primary_key_schema=primary_key_schema) + with self.cratedb_adapter.engine.connect() as connection: if not self.cratedb_adapter.table_exists(self.cratedb_table): - connection.execute(sa.text(self.translator.sql_ddl)) + connection.execute(sa.text(translator.sql_ddl)) connection.commit() records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") @@ -68,7 +71,7 @@ def start(self): processor = BulkProcessor( connection=connection, data=self.fetch(), - batch_to_operation=self.translator.to_sql, + batch_to_operation=translator.to_sql, progress_bar=progress_bar, on_error=self.on_error, debug=self.debug, diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index 984cfea..cffa28a 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -12,9 +12,16 @@ def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_ma data_in = { "Id": {"N": "101"}, + "Name": {"S": "Hotzenplotz"}, } - data_out = { - "Id": 101.0, + record_out = { + "pk": { + "Id": 101.0, + }, + "data": { + "Name": "Hotzenplotz", + }, + "aux": {}, } # Define source and target URLs. @@ -34,7 +41,7 @@ def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_ma assert cratedb.database.count_records("testdrive.demo") == 1 results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608 - assert results[0]["data"] == data_out + assert results[0] == record_out def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_manager): @@ -48,8 +55,8 @@ def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_ma {"Id": {"N": "3"}, "name": {"S": "Baz"}}, ] data_out = [ - {"data": {"Id": 1, "name": "Foo"}, "aux": {}}, - {"data": {"Id": 3, "name": "Baz"}, "aux": {}}, + {"pk": {"Id": 1}, "data": {"name": "Foo"}, "aux": {}}, + {"pk": {"Id": 3}, "data": {"name": "Baz"}, "aux": {}}, ] # Define source and target URLs. @@ -68,7 +75,7 @@ def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_ma assert cratedb.database.refresh_table("testdrive.demo") is True assert cratedb.database.count_records("testdrive.demo") == 2 - results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY data['Id'];", records=True) # noqa: S608 + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY pk['Id'];", records=True) # noqa: S608 assert results == data_out assert "Dynamic nested arrays are not supported" in caplog.text diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py index 4b60a10..612ed99 100644 --- a/tests/io/dynamodb/test_relay.py +++ b/tests/io/dynamodb/test_relay.py @@ -2,6 +2,7 @@ import time import pytest +from commons_codec.transform.dynamodb_model import PrimaryKeySchema from cratedb_toolkit.io.kinesis.relay import KinesisRelay from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis @@ -33,7 +34,8 @@ def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): table_name = '"testdrive"."demo"' # Create target table. - cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + translator = DynamoDBCDCTranslator(table_name=table_name, primary_key_schema=PrimaryKeySchema().add("id", "S")) + cratedb.database.run_sql(translator.sql_ddl) # Define two CDC events: INSERT and UPDATE. events = [ @@ -76,7 +78,8 @@ def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): table_name = '"testdrive"."demo"' # Create target table. - cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + translator = DynamoDBCDCTranslator(table_name=table_name, primary_key_schema=PrimaryKeySchema().add("id", "S")) + cratedb.database.run_sql(translator.sql_ddl) # Define two CDC events: INSERT and UPDATE. events = [ diff --git a/tests/io/test_processor.py b/tests/io/test_processor.py index e30d0e0..a92ad28 100644 --- a/tests/io/test_processor.py +++ b/tests/io/test_processor.py @@ -10,6 +10,7 @@ pytest.importorskip("commons_codec", reason="Only works with commons-codec installed") from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 +from commons_codec.transform.dynamodb_model import PrimaryKeySchema # noqa: E402 DYNAMODB_CDC_INSERT_NESTED = { "awsRegion": "us-east-1", @@ -122,7 +123,8 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker table_name = '"testdrive"."demo"' # Create target table. - cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + translator = DynamoDBCDCTranslator(table_name=table_name, primary_key_schema=PrimaryKeySchema().add("id", "S")) + cratedb.database.run_sql(translator.sql_ddl) # Configure Lambda processor per environment variables. handler_environment = {