From 4c2b218352487616a0e81aac8d9bbb14e2605f92 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 29 Jul 2024 11:14:00 +0200 Subject: [PATCH] Add transformer for DMS (PostgreSQL full-load and CDC) to CrateDB SQL --- CHANGES.md | 1 + pyproject.toml | 1 + src/commons_codec/model.py | 45 +++++ src/commons_codec/transform/aws_dms.py | 132 ++++++++++++++ tests/transform/test_aws_dms.py | 241 +++++++++++++++++++++++++ 5 files changed, 420 insertions(+) create mode 100644 src/commons_codec/model.py create mode 100644 src/commons_codec/transform/aws_dms.py create mode 100644 tests/transform/test_aws_dms.py diff --git a/CHANGES.md b/CHANGES.md index 4c07594..5949414 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Added transformer for DMS (PostgreSQL full-load and CDC) to CrateDB SQL ## 2024/07/19 v0.0.2 - Added transformer for MongoDB CDC to CrateDB SQL conversion diff --git a/pyproject.toml b/pyproject.toml index 4616d84..3104c5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,7 @@ dynamic = [ "version", ] dependencies = [ + "attrs<24", "simplejson<4", "toolz<0.13", ] diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py new file mode 100644 index 0000000..e48a728 --- /dev/null +++ b/src/commons_codec/model.py @@ -0,0 +1,45 @@ +from enum import StrEnum, auto + +from attrs import define + + +@define(frozen=True) +class TableAddress: + schema: str + table: str + + @property + def fqn(self): + return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}" + + @staticmethod + def quote_identifier(name: str) -> str: + """ + Poor man's table quoting. + + TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. + """ + if name and '"' not in name: + name = f'"{name}"' + return name + + +class ColumnType(StrEnum): + MAP = auto() + + +@define(frozen=True) +class ColumnTypeMap: + column: str + type: ColumnType + + +class PrimaryKeyStore(dict): + pass + + +class ColumnTypeMapStore(dict): + def add(self, table: TableAddress, column: str, type_: ColumnType): + self.setdefault(table, {}) + self[table][column] = type_ + return self diff --git a/src/commons_codec/transform/aws_dms.py b/src/commons_codec/transform/aws_dms.py new file mode 100644 index 0000000..7bf69e0 --- /dev/null +++ b/src/commons_codec/transform/aws_dms.py @@ -0,0 +1,132 @@ +# Copyright (c) 2023-2024, The Kotori Developers and contributors. +# Distributed under the terms of the LGPLv3 license, see LICENSE. + +# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction + +import logging +import typing as t + +import simplejson as json + +from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress + +logger = logging.getLogger(__name__) + + +class DMSTranslatorCrateDBRecord: + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__( + self, + record: t.Dict[str, t.Any], + container: "DMSTranslatorCrateDB", + ): + self.record = record + self.container = container + + self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {}) + self.control: t.Dict[str, t.Any] = self.record.get("control", {}) + self.data: t.Dict[str, t.Any] = self.record.get("data", {}) + + self.operation: t.Union[str, None] = self.metadata.get("operation") + + if not self.metadata or not self.operation: + raise ValueError("Record does not have DMS shape: metadata and/or operation is missing") + + try: + self.schema: str = self.metadata["schema-name"] + self.table: str = self.metadata["table-name"] + self.address: TableAddress = TableAddress(schema=self.schema, table=self.table) + except KeyError as ex: + raise ValueError("Schema or table name missing or empty") from ex + + self.container.primary_keys.setdefault(self.address, []) + self.container.column_types.setdefault(self.address, {}) + self.primary_keys: t.List[str] = self.container.primary_keys[self.address] + self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address] + + def to_sql(self) -> str: + if self.operation == "create-table": + pks = self.control.get("table-def", {}).get("primary-key") + if pks: + self.primary_keys += pks + return f"CREATE TABLE {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + + elif self.operation in ["load", "insert"]: + values_clause = self.record_to_values() + sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" + + elif self.operation == "update": + values_clause = self.record_to_values() + where_clause = self.keys_to_where() + sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" + + elif self.operation == "delete": + where_clause = self.keys_to_where() + sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};" + + else: + raise ValueError(f"Unknown CDC event operation: {self.operation}") + + return sql + + def record_to_values(self) -> str: + """ + Apply type translations to record, and serialize to JSON. + + IN (top-level stripped): + "data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"} + + OUT: + {"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} + """ + for column_name, column_type in self.column_types.items(): + if column_name in self.data: + value = self.data[column_name] + # DMS marshals JSON|JSONB to CLOB, aka. string. Apply a countermeasure. + if column_type is ColumnType.MAP and isinstance(value, str): + value = json.loads(value) + self.data[column_name] = value + return json.dumps(self.data) + + def keys_to_where(self) -> str: + """ + Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax. + """ + if not self.primary_keys: + raise ValueError("Unable to invoke DML operation without primary key information") + constraints: t.List[str] = [] + for key_name in self.primary_keys: + key_value = self.data.get(key_name) + # FIXME: Does the quoting of the value on the right hand side need to take the data type into account? + constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'" + constraints.append(constraint) + return " AND ".join(constraints) + + +class DMSTranslatorCrateDB: + """ + Translate DMS full-load and cdc events into CrateDB SQL statements that materialize them again. + + The SQL DDL schema for CrateDB: + CREATE TABLE (data OBJECT(DYNAMIC)); + + Blueprint: + https://www.cockroachlabs.com/docs/stable/aws-dms + """ + + def __init__( + self, + primary_keys: PrimaryKeyStore = None, + column_types: ColumnTypeMapStore = None, + ): + self.primary_keys = primary_keys or PrimaryKeyStore() + self.column_types = column_types or ColumnTypeMapStore() + + def to_sql(self, record: t.Dict[str, t.Any]) -> str: + """ + Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record. + """ + record_decoded = DMSTranslatorCrateDBRecord(record=record, container=self) + return record_decoded.to_sql() diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py new file mode 100644 index 0000000..c987a84 --- /dev/null +++ b/tests/transform/test_aws_dms.py @@ -0,0 +1,241 @@ +# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction +import json + +import pytest +from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress +from commons_codec.transform.aws_dms import DMSTranslatorCrateDB + +RECORD_INSERT = {"age": 31, "attributes": {"baz": "qux"}, "id": 46, "name": "Jane"} +RECORD_UPDATE = {"age": 33, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} + +MSG_UNKNOWN_SHAPE = { + "unknown": "foo:bar", +} +MSG_SCHEMA_TABLE_MISSING = { + "control": {}, + "metadata": { + "operation": "insert", + }, +} +MSG_UNKNOWN_OPERATION = { + "control": {}, + "metadata": { + "operation": "FOOBAR", + "schema-name": "public", + "table-name": "foo", + }, +} + +MSG_CONTROL_DROP_TABLE = { + "control": {}, + "metadata": { + "operation": "drop-table", + "partition-key-type": "task-id", + "partition-key-value": "serv-res-id-1722195358878-yhru", + "record-type": "control", + "schema-name": "public", + "table-name": "foo", + "timestamp": "2024-07-29T00:30:47.258815Z", + }, +} + +MSG_CONTROL_CREATE_TABLE = { + "control": { + "table-def": { + "columns": { + "age": {"nullable": True, "type": "INT32"}, + "attributes": {"nullable": True, "type": "STRING"}, + "id": {"nullable": False, "type": "INT32"}, + "name": {"nullable": True, "type": "STRING"}, + }, + "primary-key": ["id"], + } + }, + "metadata": { + "operation": "create-table", + "partition-key-type": "task-id", + "partition-key-value": "serv-res-id-1722195358878-yhru", + "record-type": "control", + "schema-name": "public", + "table-name": "foo", + "timestamp": "2024-07-29T00:30:47.266581Z", + }, +} + +MSG_DATA_LOAD = { + "data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}, + "metadata": { + "operation": "load", + "partition-key-type": "primary-key", + "partition-key-value": "public.foo.42", + "record-type": "data", + "schema-name": "public", + "table-name": "foo", + "timestamp": "2024-07-29T00:57:35.691762Z", + }, +} + +MSG_DATA_INSERT = { + "data": {"age": 31, "attributes": '{"baz": "qux"}', "id": 46, "name": "Jane"}, + "metadata": { + "commit-timestamp": "2024-07-29T00:58:17.974340Z", + "operation": "insert", + "partition-key-type": "schema-table", + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/7C007178.3.00000002/7C007178", + "table-name": "foo", + "timestamp": "2024-07-29T00:58:17.983670Z", + "transaction-id": 1139, + "transaction-record-id": 1, + }, +} + +MSG_DATA_UPDATE_VALUE = { + "before-image": {}, + "data": {"age": 33, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}, + "metadata": { + "commit-timestamp": "2024-07-29T00:58:44.886717Z", + "operation": "update", + "partition-key-type": "schema-table", + "prev-transaction-id": 1139, + "prev-transaction-record-id": 1, + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/7C007328.2.00000002/7C007328", + "table-name": "foo", + "timestamp": "2024-07-29T00:58:44.895275Z", + "transaction-id": 1140, + "transaction-record-id": 1, + }, +} + +MSG_DATA_UPDATE_PK = { + "before-image": {"id": 46}, + "data": {"age": 31, "attributes": '{"baz": "qux"}', "id": 45, "name": "Jane"}, + "metadata": { + "commit-timestamp": "2024-07-29T00:59:07.678294Z", + "operation": "update", + "partition-key-type": "schema-table", + "prev-transaction-id": 1140, + "prev-transaction-record-id": 1, + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/7C0073F8.2.00000002/7C0073F8", + "table-name": "foo", + "timestamp": "2024-07-29T00:59:07.686557Z", + "transaction-id": 1141, + "transaction-record-id": 1, + }, +} + +MSG_DATA_DELETE = { + "data": {"age": None, "attributes": None, "id": 45, "name": None}, + "metadata": { + "commit-timestamp": "2024-07-29T01:09:25.366257Z", + "operation": "delete", + "partition-key-type": "schema-table", + "prev-transaction-id": 1141, + "prev-transaction-record-id": 1, + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/840001D8.2.00000002/840001D8", + "table-name": "foo", + "timestamp": "2024-07-29T01:09:25.375525Z", + "transaction-id": 1144, + "transaction-record-id": 1, + }, +} + + +@pytest.fixture +def cdc(): + """ + Provide fresh translator instance. + """ + column_types = ColumnTypeMapStore().add( + table=TableAddress(schema="public", table="foo"), + column="attributes", + type_=ColumnType.MAP, + ) + return DMSTranslatorCrateDB(column_types=column_types) + + +def test_decode_cdc_unknown_source(cdc): + with pytest.raises(ValueError) as ex: + cdc.to_sql(MSG_UNKNOWN_SHAPE) + assert ex.match("Record does not have DMS shape: metadata and/or operation is missing") + + +def test_decode_cdc_missing_schema_or_table(cdc): + with pytest.raises(ValueError) as ex: + cdc.to_sql(MSG_SCHEMA_TABLE_MISSING) + assert ex.match("Schema or table name missing or empty") + + +def test_decode_cdc_unknown_event(cdc): + with pytest.raises(ValueError) as ex: + cdc.to_sql(MSG_UNKNOWN_OPERATION) + assert ex.match("Unknown CDC event operation: FOOBAR") + + +def test_decode_cdc_sql_ddl(cdc): + assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == 'CREATE TABLE "public"."foo" (data OBJECT(DYNAMIC));' + + +def test_decode_cdc_insert(cdc): + assert ( + cdc.to_sql(MSG_DATA_INSERT) == 'INSERT INTO "public"."foo" (data) VALUES ' f"('{json.dumps(RECORD_INSERT)}');" + ) + + +def test_decode_cdc_update_success(cdc): + """ + Update statements need schema knowledge about primary keys. + """ + # Seed translator with control message, describing the table schema. + cdc.to_sql(MSG_CONTROL_CREATE_TABLE) + + # Emulate an UPDATE operation. + assert ( + cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" ' + f"SET data = '{json.dumps(RECORD_UPDATE)}' " + "WHERE data['id'] = '42';" + ) + + +def test_decode_cdc_update_failure(): + """ + Update statements without schema knowledge are not possible. + + When no `create-table` statement has been processed yet, + the machinery doesn't know about primary keys. + """ + # Emulate an UPDATE operation without seeding the translator. + with pytest.raises(ValueError) as ex: + DMSTranslatorCrateDB().to_sql(MSG_DATA_UPDATE_VALUE) + assert ex.match("Unable to invoke DML operation without primary key information") + + +def test_decode_cdc_delete_success(cdc): + """ + Delete statements need schema knowledge about primary keys. + """ + # Seed translator with control message, describing the table schema. + cdc.to_sql(MSG_CONTROL_CREATE_TABLE) + + # Emulate a DELETE operation. + assert cdc.to_sql(MSG_DATA_DELETE) == 'DELETE FROM "public"."foo" ' "WHERE data['id'] = '45';" + + +def test_decode_cdc_delete_failure(cdc): + """ + Delete statements without schema knowledge are not possible. + + When no `create-table` statement has been processed yet, + the machinery doesn't know about primary keys. + """ + # Emulate an DELETE operation without seeding the translator. + with pytest.raises(ValueError) as ex: + DMSTranslatorCrateDB().to_sql(MSG_DATA_DELETE) + assert ex.match("Unable to invoke DML operation without primary key information")