Skip to content

Commit

Permalink
Add transformer for DMS (PostgreSQL full-load and CDC) to CrateDB SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 29, 2024
1 parent 4c4ac2c commit 90ffc1b
Show file tree
Hide file tree
Showing 3 changed files with 386 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added transformer for DMS (PostgreSQL full-load and CDC) to CrateDB SQL

## 2024/07/19 v0.0.2
- Added transformer for MongoDB CDC to CrateDB SQL conversion
Expand Down
150 changes: 150 additions & 0 deletions src/commons_codec/transform/aws_dms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Distributed under the terms of the LGPLv3 license, see LICENSE.

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

import logging
import typing as t

import simplejson as json

logger = logging.getLogger(__name__)


class DMSTranslatorCrateDBRecord:
# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(
self,
record: t.Dict[str, t.Any],
primary_keys: t.Dict[str, t.List[str]],
column_types: t.Dict[str, t.Dict[str, str]],
):
self.record = record

self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {})
self.control: t.Dict[str, t.Any] = self.record.get("control", {})
self.data: t.Dict[str, t.Any] = self.record.get("data", {})

self.operation: t.Union[str, None] = self.metadata.get("operation")

if not self.metadata or not self.operation:
raise ValueError("Record does not have DMS shape: metadata and/or operation is missing")

try:
self.schema: str = self.metadata["schema-name"]
self.table: str = self.metadata["table-name"]
except KeyError as ex:
raise ValueError("Schema or table name missing or empty") from ex

primary_keys.setdefault(self.table_key, [])
column_types.setdefault(self.table_key, {})

self.primary_keys: t.List[str] = primary_keys[self.table_key]
self.column_types: t.Dict[str, str] = column_types[self.table_key]

def to_sql(self) -> str:
if self.operation == "create-table":
pks = self.control.get("table-def", {}).get("primary-key")
if pks:
self.primary_keys += pks
return f"CREATE TABLE {self.table_fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

elif self.operation in ["load", "insert"]:
values_clause = self.record_to_values()
sql = f"INSERT INTO {self.table_fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"

elif self.operation == "update":
values_clause = self.record_to_values()
where_clause = self.keys_to_where()
sql = f"UPDATE {self.table_fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};"

elif self.operation == "delete":
where_clause = self.keys_to_where()
sql = f"DELETE FROM {self.table_fqn} WHERE {where_clause};"

else:
raise ValueError(f"Unknown CDC event operation: {self.operation}")

return sql

def record_to_values(self) -> str:
"""
Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax.
IN (top-level stripped):
"data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}
OUT:
{"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"}
"""
for translate_type_column, translate_type_what in self.column_types.items():
if translate_type_column in self.data:
value = self.data[translate_type_column]
if translate_type_what == "map" and isinstance(value, str):
value = json.loads(value)
self.data[translate_type_column] = value
return json.dumps(self.data)

def keys_to_where(self) -> str:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
"""
if not self.primary_keys:
raise ValueError("Unable to invoke DML operation without primary key information")
constraints: t.List[str] = []
for key_name in self.primary_keys:
key_value = self.data.get(key_name)
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)

@property
def table_key(self) -> str:
return f"{self.schema}-{self.table}"

@property
def table_fqn(self) -> str:
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}"

@staticmethod
def quote_identifier(name: str) -> str:
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if name and '"' not in name:
name = f'"{name}"'
return name


class DMSTranslatorCrateDB:
"""
Translate DMS full-load and cdc events into CrateDB SQL statements that materialize them again.
The SQL DDL schema for CrateDB:
CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
Blueprint:
https://www.cockroachlabs.com/docs/stable/aws-dms
"""

def __init__(
self,
primary_keys: t.Dict[str, t.List[str]] = None,
column_types: t.Dict[str, t.Dict[str, str]] = None,
):
self.primary_keys = primary_keys or {}
self.column_types = column_types or {}

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record.
"""
record_decoded = DMSTranslatorCrateDBRecord(
record=record, primary_keys=self.primary_keys, column_types=self.column_types
)
return record_decoded.to_sql()
235 changes: 235 additions & 0 deletions tests/transform/test_aws_dms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction
import json

import pytest
from commons_codec.transform.aws_dms import DMSTranslatorCrateDB

RECORD_INSERT = {"age": 31, "attributes": {"baz": "qux"}, "id": 46, "name": "Jane"}
RECORD_UPDATE = {"age": 33, "attributes": {"foo": "bar"}, "id": 42, "name": "John"}

MSG_UNKNOWN_SHAPE = {
"unknown": "foo:bar",
}
MSG_SCHEMA_TABLE_MISSING = {
"control": {},
"metadata": {
"operation": "insert",
},
}
MSG_UNKNOWN_OPERATION = {
"control": {},
"metadata": {
"operation": "FOOBAR",
"schema-name": "public",
"table-name": "foo",
},
}

MSG_CONTROL_DROP_TABLE = {
"control": {},
"metadata": {
"operation": "drop-table",
"partition-key-type": "task-id",
"partition-key-value": "serv-res-id-1722195358878-yhru",
"record-type": "control",
"schema-name": "public",
"table-name": "foo",
"timestamp": "2024-07-29T00:30:47.258815Z",
},
}

MSG_CONTROL_CREATE_TABLE = {
"control": {
"table-def": {
"columns": {
"age": {"nullable": True, "type": "INT32"},
"attributes": {"nullable": True, "type": "STRING"},
"id": {"nullable": False, "type": "INT32"},
"name": {"nullable": True, "type": "STRING"},
},
"primary-key": ["id"],
}
},
"metadata": {
"operation": "create-table",
"partition-key-type": "task-id",
"partition-key-value": "serv-res-id-1722195358878-yhru",
"record-type": "control",
"schema-name": "public",
"table-name": "foo",
"timestamp": "2024-07-29T00:30:47.266581Z",
},
}

MSG_DATA_LOAD = {
"data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"},
"metadata": {
"operation": "load",
"partition-key-type": "primary-key",
"partition-key-value": "public.foo.42",
"record-type": "data",
"schema-name": "public",
"table-name": "foo",
"timestamp": "2024-07-29T00:57:35.691762Z",
},
}

MSG_DATA_INSERT = {
"data": {"age": 31, "attributes": '{"baz": "qux"}', "id": 46, "name": "Jane"},
"metadata": {
"commit-timestamp": "2024-07-29T00:58:17.974340Z",
"operation": "insert",
"partition-key-type": "schema-table",
"record-type": "data",
"schema-name": "public",
"stream-position": "00000002/7C007178.3.00000002/7C007178",
"table-name": "foo",
"timestamp": "2024-07-29T00:58:17.983670Z",
"transaction-id": 1139,
"transaction-record-id": 1,
},
}

MSG_DATA_UPDATE_VALUE = {
"before-image": {},
"data": {"age": 33, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"},
"metadata": {
"commit-timestamp": "2024-07-29T00:58:44.886717Z",
"operation": "update",
"partition-key-type": "schema-table",
"prev-transaction-id": 1139,
"prev-transaction-record-id": 1,
"record-type": "data",
"schema-name": "public",
"stream-position": "00000002/7C007328.2.00000002/7C007328",
"table-name": "foo",
"timestamp": "2024-07-29T00:58:44.895275Z",
"transaction-id": 1140,
"transaction-record-id": 1,
},
}

MSG_DATA_UPDATE_PK = {
"before-image": {"id": 46},
"data": {"age": 31, "attributes": '{"baz": "qux"}', "id": 45, "name": "Jane"},
"metadata": {
"commit-timestamp": "2024-07-29T00:59:07.678294Z",
"operation": "update",
"partition-key-type": "schema-table",
"prev-transaction-id": 1140,
"prev-transaction-record-id": 1,
"record-type": "data",
"schema-name": "public",
"stream-position": "00000002/7C0073F8.2.00000002/7C0073F8",
"table-name": "foo",
"timestamp": "2024-07-29T00:59:07.686557Z",
"transaction-id": 1141,
"transaction-record-id": 1,
},
}

MSG_DATA_DELETE = {
"data": {"age": None, "attributes": None, "id": 45, "name": None},
"metadata": {
"commit-timestamp": "2024-07-29T01:09:25.366257Z",
"operation": "delete",
"partition-key-type": "schema-table",
"prev-transaction-id": 1141,
"prev-transaction-record-id": 1,
"record-type": "data",
"schema-name": "public",
"stream-position": "00000002/840001D8.2.00000002/840001D8",
"table-name": "foo",
"timestamp": "2024-07-29T01:09:25.375525Z",
"transaction-id": 1144,
"transaction-record-id": 1,
},
}


@pytest.fixture
def cdc():
"""
Provide fresh translator instance.
"""
return DMSTranslatorCrateDB(column_types={"public-foo": {"attributes": "map"}})


def test_decode_cdc_unknown_source(cdc):
with pytest.raises(ValueError) as ex:
cdc.to_sql(MSG_UNKNOWN_SHAPE)
assert ex.match("Record does not have DMS shape: metadata and/or operation is missing")


def test_decode_cdc_missing_schema_or_table(cdc):
with pytest.raises(ValueError) as ex:
cdc.to_sql(MSG_SCHEMA_TABLE_MISSING)
assert ex.match("Schema or table name missing or empty")


def test_decode_cdc_unknown_event(cdc):
with pytest.raises(ValueError) as ex:
cdc.to_sql(MSG_UNKNOWN_OPERATION)
assert ex.match("Unknown CDC event operation: FOOBAR")


def test_decode_cdc_sql_ddl(cdc):
assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == 'CREATE TABLE "public"."foo" (data OBJECT(DYNAMIC));'


def test_decode_cdc_insert(cdc):
assert (
cdc.to_sql(MSG_DATA_INSERT) == 'INSERT INTO "public"."foo" (data) VALUES ' f"('{json.dumps(RECORD_INSERT)}');"
)


def test_decode_cdc_update_success(cdc):
"""
Update statements need schema knowledge about primary keys.
"""
# Seed translator with control message, describing the table schema.
cdc.to_sql(MSG_CONTROL_CREATE_TABLE)

# Emulate an UPDATE operation.
assert (
cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" '
f"SET data = '{json.dumps(RECORD_UPDATE)}' "
"WHERE data['id'] = '42';"
)


def test_decode_cdc_update_failure():
"""
Update statements without schema knowledge are not possible.
When no `create-table` statement has been processed yet,
the machinery doesn't know about primary keys.
"""
# Emulate an UPDATE operation without seeding the translator.
with pytest.raises(ValueError) as ex:
DMSTranslatorCrateDB().to_sql(MSG_DATA_UPDATE_VALUE)
assert ex.match("Unable to invoke DML operation without primary key information")


def test_decode_cdc_delete_success(cdc):
"""
Delete statements need schema knowledge about primary keys.
"""
# Seed translator with control message, describing the table schema.
cdc.to_sql(MSG_CONTROL_CREATE_TABLE)

# Emulate a DELETE operation.
assert cdc.to_sql(MSG_DATA_DELETE) == 'DELETE FROM "public"."foo" ' "WHERE data['id'] = '45';"


def test_decode_cdc_delete_failure(cdc):
"""
Delete statements without schema knowledge are not possible.
When no `create-table` statement has been processed yet,
the machinery doesn't know about primary keys.
"""
# Emulate an DELETE operation without seeding the translator.
with pytest.raises(ValueError) as ex:
DMSTranslatorCrateDB().to_sql(MSG_DATA_DELETE)
assert ex.match("Unable to invoke DML operation without primary key information")

0 comments on commit 90ffc1b

Please sign in to comment.