Skip to content

Commit

Permalink
Add transformer for DMS (PostgreSQL full-load and CDC) to CrateDB SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 2, 2024
1 parent 3c83455 commit 4c2b218
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added transformer for DMS (PostgreSQL full-load and CDC) to CrateDB SQL

## 2024/07/19 v0.0.2
- Added transformer for MongoDB CDC to CrateDB SQL conversion
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ dynamic = [
"version",
]
dependencies = [
"attrs<24",
"simplejson<4",
"toolz<0.13",
]
Expand Down
45 changes: 45 additions & 0 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from enum import StrEnum, auto

from attrs import define


@define(frozen=True)
class TableAddress:
schema: str
table: str

@property
def fqn(self):
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}"

@staticmethod
def quote_identifier(name: str) -> str:
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if name and '"' not in name:
name = f'"{name}"'
return name


class ColumnType(StrEnum):
MAP = auto()


@define(frozen=True)
class ColumnTypeMap:
column: str
type: ColumnType


class PrimaryKeyStore(dict):
pass


class ColumnTypeMapStore(dict):
def add(self, table: TableAddress, column: str, type_: ColumnType):
self.setdefault(table, {})
self[table][column] = type_
return self
132 changes: 132 additions & 0 deletions src/commons_codec/transform/aws_dms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Distributed under the terms of the LGPLv3 license, see LICENSE.

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

import logging
import typing as t

import simplejson as json

from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress

logger = logging.getLogger(__name__)


class DMSTranslatorCrateDBRecord:
# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(
self,
record: t.Dict[str, t.Any],
container: "DMSTranslatorCrateDB",
):
self.record = record
self.container = container

self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {})
self.control: t.Dict[str, t.Any] = self.record.get("control", {})
self.data: t.Dict[str, t.Any] = self.record.get("data", {})

self.operation: t.Union[str, None] = self.metadata.get("operation")

if not self.metadata or not self.operation:
raise ValueError("Record does not have DMS shape: metadata and/or operation is missing")

try:
self.schema: str = self.metadata["schema-name"]
self.table: str = self.metadata["table-name"]
self.address: TableAddress = TableAddress(schema=self.schema, table=self.table)
except KeyError as ex:
raise ValueError("Schema or table name missing or empty") from ex

self.container.primary_keys.setdefault(self.address, [])
self.container.column_types.setdefault(self.address, {})
self.primary_keys: t.List[str] = self.container.primary_keys[self.address]
self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address]

def to_sql(self) -> str:
if self.operation == "create-table":
pks = self.control.get("table-def", {}).get("primary-key")
if pks:
self.primary_keys += pks
return f"CREATE TABLE {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

elif self.operation in ["load", "insert"]:
values_clause = self.record_to_values()
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"

elif self.operation == "update":
values_clause = self.record_to_values()
where_clause = self.keys_to_where()
sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};"

elif self.operation == "delete":
where_clause = self.keys_to_where()
sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};"

else:
raise ValueError(f"Unknown CDC event operation: {self.operation}")

return sql

def record_to_values(self) -> str:
"""
Apply type translations to record, and serialize to JSON.
IN (top-level stripped):
"data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}
OUT:
{"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"}
"""
for column_name, column_type in self.column_types.items():
if column_name in self.data:
value = self.data[column_name]
# DMS marshals JSON|JSONB to CLOB, aka. string. Apply a countermeasure.
if column_type is ColumnType.MAP and isinstance(value, str):
value = json.loads(value)
self.data[column_name] = value
return json.dumps(self.data)

def keys_to_where(self) -> str:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
"""
if not self.primary_keys:
raise ValueError("Unable to invoke DML operation without primary key information")
constraints: t.List[str] = []
for key_name in self.primary_keys:
key_value = self.data.get(key_name)
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)


class DMSTranslatorCrateDB:
"""
Translate DMS full-load and cdc events into CrateDB SQL statements that materialize them again.
The SQL DDL schema for CrateDB:
CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
Blueprint:
https://www.cockroachlabs.com/docs/stable/aws-dms
"""

def __init__(
self,
primary_keys: PrimaryKeyStore = None,
column_types: ColumnTypeMapStore = None,
):
self.primary_keys = primary_keys or PrimaryKeyStore()
self.column_types = column_types or ColumnTypeMapStore()

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record.
"""
record_decoded = DMSTranslatorCrateDBRecord(record=record, container=self)
return record_decoded.to_sql()
Loading

0 comments on commit 4c2b218

Please sign in to comment.