Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace poor man's relation name quoting with implementation from sqlalchemy-cratedb #38

Merged
merged 1 commit into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Replace poor man's relation name quoting with implementation
`quote_relation_name` from `sqlalchemy-cratedb` package.

## 2024/08/27 v0.0.13
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values
Expand Down
5 changes: 3 additions & 2 deletions examples/mongodb_cdc_cratedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pymongo
import sqlalchemy as sa
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB

Expand All @@ -33,7 +34,7 @@ def __init__(
self.cratedb_client = sa.create_engine(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = cratedb_table
self.table_name = quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
Expand All @@ -45,7 +46,7 @@ def start(self):
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f"REFRESH TABLE {self.cdc.quote_table_name(self.table_name)};"))
connection.execute(sa.text(f"REFRESH TABLE {self.table_name};"))

def cdc_to_sql(self):
"""
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ dependencies = [
"backports-strenum<1.3; python_version<'3.11'",
"cattrs<24",
"simplejson<4",
"sqlalchemy-cratedb>=0.39.0",
"toolz<0.13",
]
optional-dependencies.all = [
Expand Down
17 changes: 4 additions & 13 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import sys
import typing as t
from enum import auto
from functools import cached_property

from attr import Factory
from attrs import define
from sqlalchemy_cratedb.support import quote_relation_name

if sys.version_info >= (3, 11):
from enum import StrEnum
Expand All @@ -17,22 +19,11 @@ class TableAddress:
schema: str
table: str

@property
@cached_property
def fqn(self):
if not self.schema:
raise ValueError("Unable to compute a full-qualified table name without schema name")
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}"

@staticmethod
def quote_identifier(name: str) -> str:
"""
Poor man's table quoting.

TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if name and '"' not in name:
name = f'"{name}"'
return name
return quote_relation_name(f"{self.schema}.{self.table}")


class ColumnType(StrEnum):
Expand Down
14 changes: 2 additions & 12 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing as t

import toolz
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import (
SQLOperation,
Expand Down Expand Up @@ -48,7 +49,7 @@ class DynamoTranslatorBase:

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.table_name = quote_relation_name(table_name)
self.deserializer = CrateDBTypeDeserializer()

@property
Expand All @@ -58,17 +59,6 @@ def sql_ddl(self):
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.

TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name and "." not in name:
name = f'"{name}"'
return name

def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
Deserialize DynamoDB JSON record into vanilla Python.
Expand Down
14 changes: 2 additions & 12 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing as t

from bson.json_util import _json_convert
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import SQLOperation

Expand Down Expand Up @@ -101,7 +102,7 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase):

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.table_name = quote_relation_name(table_name)

@property
def sql_ddl(self):
Expand Down Expand Up @@ -184,14 +185,3 @@ def where_clause(self, record: t.Dict[str, t.Any]) -> str:
"""
oid = self.get_document_key(record)
return f"oid = '{oid}'"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.

TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name:
name = f'"{name}"'
return name
11 changes: 8 additions & 3 deletions tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress


def test_table_address_success():
def test_table_address_basic():
ta = TableAddress(schema="foo", table="bar")
assert ta.fqn == '"foo"."bar"'
assert ta.fqn == "foo.bar"


def test_table_address_quoting():
ta = TableAddress(schema="select", table="from")
assert ta.fqn == '"select"."from"'


def test_table_address_failure():
ta = TableAddress(schema=None, table="bar")
with pytest.raises(ValueError) as ex:
_ = ta.fqn
assert ex.match("adcdc")
assert ex.match("Unable to compute a full-qualified table name without schema name")


def test_column_type_map_store_serialize():
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/test_aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,19 @@ def test_decode_cdc_unknown_event(cdc):

def test_decode_cdc_sql_ddl_regular(cdc):
assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == SQLOperation(
statement='CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));', parameters=None
statement="CREATE TABLE IF NOT EXISTS public.foo (data OBJECT(DYNAMIC));", parameters=None
)


def test_decode_cdc_sql_ddl_awsdms(cdc):
assert cdc.to_sql(MSG_CONTROL_AWSDMS) == SQLOperation(
statement='CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));', parameters=None
statement="CREATE TABLE IF NOT EXISTS dms.awsdms_apply_exceptions (data OBJECT(DYNAMIC));", parameters=None
)


def test_decode_cdc_insert(cdc):
assert cdc.to_sql(MSG_DATA_INSERT) == SQLOperation(
statement='INSERT INTO "public"."foo" (data) VALUES (:record);', parameters={"record": RECORD_INSERT}
statement="INSERT INTO public.foo (data) VALUES (:record);", parameters={"record": RECORD_INSERT}
)


Expand All @@ -238,7 +238,7 @@ def test_decode_cdc_update_success(cdc):

# Emulate an UPDATE operation.
assert cdc.to_sql(MSG_DATA_UPDATE_VALUE) == SQLOperation(
statement='UPDATE "public"."foo" SET '
statement="UPDATE public.foo SET "
"data['age']=:age, data['attributes']=:attributes, data['name']=:name "
"WHERE data['id']=:id;",
parameters=RECORD_UPDATE,
Expand Down Expand Up @@ -267,7 +267,7 @@ def test_decode_cdc_delete_success(cdc):

# Emulate a DELETE operation.
assert cdc.to_sql(MSG_DATA_DELETE) == SQLOperation(
statement='DELETE FROM "public"."foo" WHERE data[\'id\']=:id;', parameters={"id": 45}
statement="DELETE FROM public.foo WHERE data['id']=:id;", parameters={"id": 45}
)


Expand Down
12 changes: 6 additions & 6 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def test_decode_ddb_deserialize_type():


def test_decode_cdc_sql_ddl():
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));'
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"


def test_decode_cdc_unknown_source():
Expand All @@ -196,7 +196,7 @@ def test_decode_cdc_unknown_event():

def test_decode_cdc_insert_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"humidity": 84.84,
Expand All @@ -213,7 +213,7 @@ def test_decode_cdc_insert_basic():

def test_decode_cdc_insert_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -230,7 +230,7 @@ def test_decode_cdc_insert_nested():

def test_decode_cdc_modify_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation(
statement='UPDATE "foo" SET '
statement="UPDATE foo SET "
"data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['empty_string']=:empty_string, data['null_string']=:null_string "
Expand All @@ -252,7 +252,7 @@ def test_decode_cdc_modify_basic():

def test_decode_cdc_modify_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation(
statement='UPDATE "foo" SET '
statement="UPDATE foo SET "
"data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) "
Expand All @@ -274,7 +274,7 @@ def test_decode_cdc_modify_nested():

def test_decode_cdc_remove():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE data['device']=:device AND data['timestamp']=:timestamp;",
statement="DELETE FROM foo WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "bar",
"timestamp": "2024-07-12T01:17:42",
Expand Down
7 changes: 3 additions & 4 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@

def test_sql_ddl():
assert (
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl
== 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));'
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"
)


def test_to_sql_all_types():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_ALL_TYPES) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -59,7 +58,7 @@ def test_to_sql_all_types():

def test_to_sql_list_of_objects():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_UTM) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"utmTags": [
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/test_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
def test_decode_cdc_sql_ddl():
assert (
MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl
== 'CREATE TABLE IF NOT EXISTS "foo" (oid TEXT, data OBJECT(DYNAMIC));'
== "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));"
)


Expand All @@ -120,7 +120,7 @@ def test_decode_cdc_optype_empty():

def test_decode_cdc_insert():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation(
statement='INSERT INTO "foo" (oid, data) VALUES (:oid, :record);',
statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);",
parameters={
"oid": "669683c2b0750b2c84893f3e",
"record": {
Expand All @@ -135,7 +135,7 @@ def test_decode_cdc_insert():

def test_decode_cdc_update():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation(
statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={
"record": {
"_id": {"$oid": "669683c2b0750b2c84893f3e"},
Expand All @@ -149,14 +149,14 @@ def test_decode_cdc_update():

def test_decode_cdc_replace():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation(
statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}},
)


def test_decode_cdc_delete():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None
statement="DELETE FROM foo WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None
)


Expand Down