Skip to content

Commit

Permalink
MongoDB: Sanitize lists of varying objects
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 3, 2024
1 parent 453bd3b commit 59e9c1d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL.
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.
- MongoDB: Sanitize lists of varying objects

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
9 changes: 4 additions & 5 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation:
"""

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);"
sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);"

# Converge MongoDB document to SQL parameters.
record = extract_value(self.decode_bson(document))
Expand Down Expand Up @@ -100,16 +100,15 @@ def start(self):
records_out = 0

for document in self.mongodb_collection.find().limit(self.mongodb_limit):
result_size = 1

try:
operation = self.translator.to_sql(document)
logger.info("operation: %s", operation)
logger.debug("SQL operation: %s", operation)
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue
try:
connection.execute(sa.text(operation.statement), operation.parameters)
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
Expand Down
45 changes: 45 additions & 0 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"""

import base64
import builtins
import calendar
import typing as t
from uuid import UUID
Expand All @@ -33,6 +34,7 @@
import dateutil.parser as dateparser
import orjson as json
import pymongo.collection
from attrs import define

from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.io.mongodb.util import sanitize_field_names
Expand Down Expand Up @@ -75,6 +77,10 @@ def extract_value(value, parent_type=None):
return extract_value(v, k.lstrip("$"))
return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()}
if isinstance(value, list):
if value and isinstance(value[0], dict):
lovos = ListOfVaryingObjectsSanitizer(value)
lovos.apply()

return [extract_value(v, parent_type) for v in value]
if parent_type:
converter = type_converter.get(parent_type)
Expand All @@ -83,6 +89,45 @@ def extract_value(value, parent_type=None):
return value


@define
class ListOfVaryingObjectsSanitizer:
"""
CrateDB can not store lists of varying objects, so normalize them.
"""

data: t.List[t.Dict[str, t.Any]]

def apply(self):
self.apply_rules(self.get_rules(self.type_stats()))

def type_stats(self) -> t.Dict[str, t.List[str]]:
types: t.Dict[str, t.List[str]] = {}
for item in self.data:
for key, value in item.items():
types.setdefault(key, []).append(type(value).__name__)
return types

def get_rules(self, all_types):
rules = []
for name, types in all_types.items():
if len(types) > 1:
rules.append({"name": name, "converter": self.get_best_converter(types)})
return rules

def apply_rules(self, rules):
for item in self.data:
for rule in rules:
name = rule["name"]
if name in item:
item[name] = rule["converter"](item[name])

@staticmethod
def get_best_converter(types: t.List[str]) -> t.Callable:
if "str" in types:
return builtins.str
return lambda x: x


def convert(d):
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
Expand Down
10 changes: 8 additions & 2 deletions tests/io/mongodb/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def test_version():
"list_empty": [],
"list_float": [42.42, 43.43],
"list_integer": [42, 43],
"list_object": [{"foo": "bar"}, {"baz": "qux"}],
"list_object_symmetric": [{"foo": "bar"}, {"baz": "qux"}],
"list_object_varying_string": [{"value": 42}, {"value": "qux"}],
# TODO: Improve decoding of inner items.
"list_object_varying_date": [{"value": DATETIME}, {"value": "qux"}],
"list_string": ["foo", "bar"],
},
}
Expand All @@ -65,7 +68,10 @@ def test_version():
"list_empty": [],
"list_float": [42.42, 43.43],
"list_integer": [42, 43],
"list_object": [{"foo": "bar"}, {"baz": "qux"}],
"list_object_symmetric": [{"foo": "bar"}, {"baz": "qux"}],
"list_object_varying_string": [{"value": "42"}, {"value": "qux"}],
# TODO: Improve decoding of inner items.
"list_object_varying_date": [{"value": "{'$date': '2020-06-19T15:03:53.727Z'}"}, {"value": "qux"}],
"list_string": ["foo", "bar"],
},
},
Expand Down

0 comments on commit 59e9c1d

Please sign in to comment.