From 5b43bb59afdbc414bcea6a93e203daa23eca0e8f Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 05:43:37 +0200 Subject: [PATCH] MongoDB: Sanitize lists of varying objects --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/copy.py | 9 +++--- cratedb_toolkit/io/mongodb/export.py | 43 ++++++++++++++++++++++++++++ tests/io/mongodb/test_cli.py | 6 ++-- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e674ba3..5c43831 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ - MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL. This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. +- MongoDB: Sanitize lists of varying objects ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index d57b71f..a735a1b 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -37,7 +37,7 @@ def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation: """ # Define SQL INSERT statement. - sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" + sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" # Converge MongoDB document to SQL parameters. record = extract_value(self.decode_bson(document)) @@ -100,16 +100,15 @@ def start(self): records_out = 0 for document in self.mongodb_collection.find().limit(self.mongodb_limit): - result_size = 1 - try: operation = self.translator.to_sql(document) - logger.info("operation: %s", operation) + logger.debug("SQL operation: %s", operation) except Exception as ex: logger_on_error(f"Transforming query failed: {ex}") continue try: - connection.execute(sa.text(operation.statement), operation.parameters) + result = connection.execute(sa.text(operation.statement), operation.parameters) + result_size = result.rowcount records_out += result_size progress_bar.update(n=result_size) except Exception as ex: diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 74ec9a6..c91caf4 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,6 +25,7 @@ """ import base64 +import builtins import calendar import typing as t from uuid import UUID @@ -33,6 +34,7 @@ import dateutil.parser as dateparser import orjson as json import pymongo.collection +from attrs import define from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names @@ -75,6 +77,10 @@ def extract_value(value, parent_type=None): return extract_value(v, k.lstrip("$")) return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()} if isinstance(value, list): + if value and isinstance(value[0], dict): + lovos = ListOfVaryingObjectsSanitizer(value) + lovos.apply() + return [extract_value(v, parent_type) for v in value] if parent_type: converter = type_converter.get(parent_type) @@ -83,6 +89,43 @@ def extract_value(value, parent_type=None): return value +@define +class ListOfVaryingObjectsSanitizer: + """ + CrateDB can not store lists of varying objects, so normalize them. + """ + + data: t.List[t.Dict[str, t.Any]] + + def apply(self): + self.apply_rules(self.get_rules(self.type_stats())) + + def type_stats(self) -> t.Dict[str, t.List[str]]: + types: t.Dict[str, t.List[str]] = {} + for item in self.data: + for key, value in item.items(): + types.setdefault(key, []).append(type(value).__name__) + return types + + def get_rules(self, all_types): + rules = [] + for name, types in all_types.items(): + if len(types) > 1: + rules.append({"name": name, "converter": self.get_best_converter(types)}) + return rules + + def apply_rules(self, rules): + for item in self.data: + for rule in rules: + item[rule["name"]] = rule["converter"](item[rule["name"]]) + + @staticmethod + def get_best_converter(types: t.List[str]) -> t.Callable: + if "str" in types: + return builtins.str + return lambda x: x + + def convert(d): """ Decode MongoDB Extended JSON, considering CrateDB specifics. diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index 7c32487..37a768b 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -47,7 +47,8 @@ def test_version(): "list_empty": [], "list_float": [42.42, 43.43], "list_integer": [42, 43], - "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_symmetric": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_varying": [{"value": 42}, {"value": "qux"}], "list_string": ["foo", "bar"], }, } @@ -65,7 +66,8 @@ def test_version(): "list_empty": [], "list_float": [42.42, 43.43], "list_integer": [42, 43], - "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_symmetric": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_varying": [{"value": "42"}, {"value": "qux"}], "list_string": ["foo", "bar"], }, },