From dd10dbc83605ed9c362095991e18e6fb574b664d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 13:18:22 +0200 Subject: [PATCH] MongoDB: `--treatment` option, and ingress pagination / egress batching - Add `--treatment` option for applying special treatments Certain fields should be stored as lists, some need to be ignored for now, others need to be treated manually. - Use pagination on source collection, for creating batches towards CrateDB. --- CHANGES.md | 3 + cratedb_toolkit/api/main.py | 13 ++- cratedb_toolkit/io/cli.py | 4 +- cratedb_toolkit/io/mongodb/api.py | 9 +- cratedb_toolkit/io/mongodb/copy.py | 81 ++++++++++++---- cratedb_toolkit/io/mongodb/export.py | 138 +++++++++++++++++++++------ cratedb_toolkit/io/mongodb/model.py | 15 +++ cratedb_toolkit/io/mongodb/util.py | 4 +- pyproject.toml | 1 + tests/io/mongodb/test_cli.py | 58 ++++++++++- tests/io/mongodb/test_export.py | 38 ++++++++ tests/io/mongodb/test_translate.py | 4 +- tests/io/mongodb/test_util.py | 4 +- 13 files changed, 311 insertions(+), 61 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/model.py create mode 100644 tests/io/mongodb/test_export.py diff --git a/CHANGES.md b/CHANGES.md index 5c43831e..84f29ad9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,9 @@ This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. - MongoDB: Sanitize lists of varying objects +- MongoDB: Add `--treatment` option for applying special treatments to certain items + on real-world data +- MongoDB: Use pagination on source collection, for creating batches towards CrateDB ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 01a3d029..1bb8385e 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -19,7 +19,7 @@ class ClusterBase(abc.ABC): @abstractmethod - def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path): + def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path, treatment: Path): raise NotImplementedError("Child class needs to implement this method") @@ -37,7 +37,11 @@ def __post_init__(self): logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}") def load_table( - self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None + self, + resource: InputOutputResource, + target: t.Optional[TableAddress] = None, + transformation: Path = None, + treatment: Path = None, ): """ Load data into a database table on CrateDB Cloud. @@ -99,7 +103,9 @@ class StandaloneCluster(ClusterBase): address: DatabaseAddress info: t.Optional[ClusterInformation] = None - def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None): + def load_table( + self, resource: InputOutputResource, target: TableAddress, transformation: Path = None, treatment: Path = None + ): """ Load data into a database table on a standalone CrateDB Server. @@ -145,6 +151,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf source_url, target_url, transformation=transformation, + treatment=treatment, limit=int(source_url_obj.query.get("limit", 0)), progress=True, ): diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6d188be0..d7edc3e2 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -37,6 +37,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): @click.option("--format", "format_", type=str, required=False, help="File format of the import resource") @click.option("--compression", type=str, required=False, help="Compression format of the import resource") @click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file") +@click.option("--treatment", type=Path, required=False, help="Path to treatment description file") @click.pass_context def load_table( ctx: click.Context, @@ -49,6 +50,7 @@ def load_table( format_: str, compression: str, transformation: Path, + treatment: Path, ): """ Import data into CrateDB and CrateDB Cloud clusters. @@ -85,4 +87,4 @@ def load_table( cluster = StandaloneCluster(address=address) else: raise NotImplementedError("Unable to select backend") - return cluster.load_table(resource=resource, target=target, transformation=transformation) + return cluster.load_table(resource=resource, target=target, transformation=transformation, treatment=treatment) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index e95890c9..9405b9f5 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -5,6 +5,7 @@ from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad from cratedb_toolkit.io.mongodb.core import export, extract, translate +from cratedb_toolkit.io.mongodb.model import Treatment from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json @@ -87,7 +88,9 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi return True -def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): +def mongodb_copy( + source_url, target_url, transformation: Path = None, treatment: Path = None, limit: int = 0, progress: bool = False +): """ Transfer MongoDB collection using translator component. @@ -109,6 +112,9 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int tm = None if transformation: tm = TransformationManager(path=transformation) + tt = None + if treatment: + tt = Treatment.from_yaml(treatment.read_text()) # Invoke `full-load` procedure. mdb_full = MongoDBFullLoad( @@ -117,6 +123,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int mongodb_collection=mongodb_collection, cratedb_url=target_url, tm=tm, + treatment=tt, progress=progress, ) mdb_full.start() diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index a735a1bc..fa96566c 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -8,8 +8,11 @@ from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from yarl import URL -from cratedb_toolkit.io.mongodb.export import extract_value +from cratedb_toolkit.io.mongodb.export import CrateDBConverter, CrateDBConverterConfig +from cratedb_toolkit.io.mongodb.model import DocumentDict, Treatment from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util import DatabaseAdapter @@ -18,8 +21,13 @@ class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB): - def __init__(self, table_name: str, tm: TransformationManager = None): + """ + Translate a MongoDB document into a CrateDB document. + """ + + def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None): super().__init__(table_name=table_name) + self.converter = converter self.tm = tm @staticmethod @@ -29,20 +37,24 @@ def get_document_key(record: t.Dict[str, t.Any]) -> str: "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} """ - return record["_id"] + return record["__id"] - def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation: + def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation: """ - Produce CrateDB INSERT SQL statement from MongoDB document. + Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ + if not isinstance(data, list): + data = [data] # Define SQL INSERT statement. sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" - # Converge MongoDB document to SQL parameters. - record = extract_value(self.decode_bson(document)) - oid: str = self.get_document_key(record) - parameters = {"oid": oid, "record": record} + # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. + parameters: t.List[DocumentDict] = [] + for document in data: + record = self.converter.convert(self.decode_bson(document)) + oid: str = self.get_document_key(record) + parameters.append({"oid": oid, "record": record}) return SQLOperation(sql, parameters) @@ -59,7 +71,9 @@ def __init__( mongodb_collection: str, cratedb_url: str, tm: t.Union[TransformationManager, None], + treatment: t.Union[Treatment, None], mongodb_limit: int = 0, + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): @@ -67,6 +81,7 @@ def __init__( cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname + self.mongodb_uri = URL(mongodb_url) self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient( mongodb_url, document_class=RawBSONDocument, @@ -76,11 +91,21 @@ def __init__( self.mongodb_limit = mongodb_limit self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm) + # Transformation machinery. + if treatment: + converter_config = CrateDBConverterConfig(treatment=treatment) + self.converter = CrateDBConverter(config=converter_config) + else: + self.converter = CrateDBConverter() + self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm) + + self.on_error = on_error self.progress = progress self.debug = debug + self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250)) + def start(self): """ Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. @@ -90,29 +115,49 @@ def start(self): logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception - with self.cratedb_adapter.engine.connect() as connection: + with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm(): if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) connection.commit() records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out = 0 + records_out: int = 0 - for document in self.mongodb_collection.find().limit(self.mongodb_limit): + skip: int = 0 + while True: + progress_bar.set_description("ACQUIRE") + # Acquire batch of documents, and convert to SQL operation. + documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size) try: - operation = self.translator.to_sql(document) - logger.debug("SQL operation: %s", operation) + operation = self.translator.to_sql(list(documents)) except Exception as ex: - logger_on_error(f"Transforming query failed: {ex}") - continue + logger_on_error(f"Computing query failed: {ex}") + if self.on_error == "raise": + raise + break + + # When input data is exhausted, stop processing. + progress_bar.set_description("CHECK") + if not operation.parameters: + break + + # Submit operation to CrateDB. + progress_bar.set_description("SUBMIT") try: result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount + if result_size < 0: + raise ValueError("Unable to insert one or more records") records_out += result_size progress_bar.update(n=result_size) except Exception as ex: - logger_on_error(f"Executing query failed: {ex}") + logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") + if self.on_error == "raise": + raise + + # Next page. + skip += self.batch_size progress_bar.close() connection.commit() diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 2e5d711b..f25dee9f 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -27,6 +27,7 @@ import base64 import builtins import calendar +import logging import typing as t from uuid import UUID @@ -34,11 +35,15 @@ import dateutil.parser as dateparser import orjson as json import pymongo.collection +from attr import Factory from attrs import define +from cratedb_toolkit.io.mongodb.model import DocumentDict, Treatment from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names +logger = logging.getLogger(__name__) + def date_converter(value): if isinstance(value, int): @@ -60,33 +65,110 @@ def timestamp_converter(value): } -def extract_value(value, parent_type=None): - """ - Decode MongoDB Extended JSON. +@define +class CrateDBConverterConfig: + treatment: Treatment = Factory(Treatment) - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ - """ - if isinstance(value, dict): - if len(value) == 1: - if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: - decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) - return extract_value(decoded, parent_type) + +@define +class CrateDBConverter: + config: CrateDBConverterConfig = Factory(CrateDBConverterConfig) + + def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + newdict = {} + for k, v in sanitize_field_names(data).items(): + newdict[k] = self.extract_value(v) + return newdict + + def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any: + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ + if isinstance(value, dict): + # Custom adjustments to compensate shape anomalies in source data. + self.apply_special_treatments(value) + if len(value) == 1: + if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: + decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) + return self.extract_value(decoded, parent_type) + for k, v in value.items(): + if k.startswith("$"): + return self.extract_value(v, k.lstrip("$")) + return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()} + if isinstance(value, list): + if value and isinstance(value[0], dict): + lovos = ListOfVaryingObjectsSanitizer(value) + lovos.apply() + + return [self.extract_value(v, parent_type) for v in value] + if parent_type: + converter = type_converter.get(parent_type) + if converter: + return converter(value) + return value + + def apply_special_treatments(self, value): + """ + Apply special treatments to value that can't be described otherwise up until now. + # Ignore certain items including anomalies that are not resolved, yet. + + TODO: Needs an integration test feeding two records instead of just one. + """ + + if self.config is None or self.config.treatment is None: + return + + # Optionally ignore lists of complex objects. + local_ignores = [] + if self.config.treatment.ignore_complex_lists: for k, v in value.items(): - if k.startswith("$"): - return extract_value(v, k.lstrip("$")) - return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()} - if isinstance(value, list): - if value and isinstance(value[0], dict): - lovos = ListOfVaryingObjectsSanitizer(value) - lovos.apply() - - return [extract_value(v, parent_type) for v in value] - if parent_type: - converter = type_converter.get(parent_type) - if converter: - return converter(value) - return value + if isinstance(v, list) and v and isinstance(v[0], dict): + # Skip ignoring special-encoded items. + if v[0] and list(v[0].keys())[0].startswith("$"): + continue + local_ignores.append(k) + + # Apply global and computed ignores. + for ignore_name in self.config.treatment.ignore_columns + local_ignores: + if ignore_name in value: + del value[ignore_name] + + # Converge certain items to `list` even when defined differently. + for to_list_name in self.config.treatment.to_list: + if to_list_name in value and not isinstance(value[to_list_name], list): + value[to_list_name] = [value[to_list_name]] + + # Converge certain items to `str` even when defined differently. + for name in self.config.treatment.to_string: + if name in value and not isinstance(value[name], str): + value[name] = str(value[name]) + + """ + # Manual treatment. + # Some nested objects have been defined as strings, probably in previous schema versions. + if "users" in value: + for user_item in value["users"]: + if "user" in user_item and not isinstance(user_item["user"], dict): + user_item["user"] = {"id": user_item["user"]} + """ + + if "createdBy" in value and not isinstance(value["createdBy"], dict): + value["createdBy"] = {"id": value["createdBy"]} + + # Prune invalid date representations. + for key in ["start_date", "end_date"]: + if key in value: + if not isinstance(value[key], dict): + del value[key] + elif "date" in value[key]: + if isinstance(value[key]["date"], str): + del value[key] @define @@ -132,10 +214,8 @@ def convert(d): """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ - newdict = {} - for k, v in sanitize_field_names(d).items(): - newdict[k] = extract_value(v) - return newdict + config = CrateDBConverterConfig() + return CrateDBConverter(config=config).convert(d) def collection_to_json( diff --git a/cratedb_toolkit/io/mongodb/model.py b/cratedb_toolkit/io/mongodb/model.py new file mode 100644 index 00000000..83d918bd --- /dev/null +++ b/cratedb_toolkit/io/mongodb/model.py @@ -0,0 +1,15 @@ +import typing as t + +from attr import Factory +from attrs import define +from zyp.model.base import Dumpable + +DocumentDict = t.Dict[str, t.Any] + + +@define +class Treatment(Dumpable): + ignore_complex_lists: bool = False + ignore_columns: t.List[str] = Factory(list) + to_list: t.List[str] = Factory(list) + to_string: t.List[str] = Factory(list) diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 2e5d0f6b..529e5f6b 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,6 @@ import re -import typing as t +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.util.data_dict import OrderedDictX @@ -26,7 +26,7 @@ def parse_input_numbers(s: str): return options -def sanitize_field_names(data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: +def sanitize_field_names(data: DocumentDict) -> DocumentDict: """ Rename top-level column names with single leading underscores to double leading underscores. CrateDB does not accept singe leading underscores, like `_id`. diff --git a/pyproject.toml b/pyproject.toml index 17d49248..9aad0991 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ dynamic = [ "version", ] dependencies = [ + "attrs<25", "boltons<25", "click<9", "click-aliases<2,>=1.0.4", diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index a97b7243..fd8054e1 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -1,12 +1,15 @@ import os +from copy import deepcopy from unittest import mock from uuid import UUID import dateutil import pytest import sqlparse +import yaml from click.testing import CliRunner from pueblo.testing.dataframe import DataFrameFactory +from toolz import dissoc from cratedb_toolkit.cli import cli from tests.conftest import check_sqlalchemy2 @@ -57,7 +60,7 @@ def test_version(): DOCUMENT_OUT = { "oid": mock.ANY, "data": { - "_id": mock.ANY, + "__id": mock.ANY, "id": "d575540f-759d-4653-a4c4-4a9e410f1aa1", "value": { "name": "foobar", @@ -76,6 +79,14 @@ def test_version(): }, }, } +DOCUMENT_OUT_NO_COMPLEX_LISTS = deepcopy(DOCUMENT_OUT) +DOCUMENT_OUT_NO_COMPLEX_LISTS["data"]["value"] = dissoc( + DOCUMENT_OUT["data"]["value"], + "list_object_symmetric", + "list_object_varying_string", + "list_object_varying_date", +) + DOCUMENT_DDL = """ CREATE TABLE IF NOT EXISTS "testdrive"."demo" ( "oid" TEXT, @@ -114,12 +125,12 @@ def test_mongodb_load_table_basic(caplog, cratedb, mongodb): assert cratedb.database.count_records("testdrive.demo") == 42 -def test_mongodb_load_table_real(caplog, cratedb, mongodb): +def test_mongodb_load_table_complex_lists_include(caplog, cratedb, mongodb): """ CLI test: Invoke `ctk load table` for MongoDB. """ - cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database. client: pymongo.MongoClient = mongodb.get_connection_client() @@ -150,3 +161,44 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): sql = results[0][0] sql = sqlparse.format(sql) assert sql.startswith(DOCUMENT_DDL) + + +def test_mongodb_load_table_complex_lists_ignore(caplog, tmp_path, cratedb, mongodb): + """ + CLI test: Invoke `ctk load table` for MongoDB, with special parameter to ignore complex lists. + """ + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + treatment_path = tmp_path / "treatment.yaml" + treatment_path.write_text(yaml.safe_dump({"ignore_complex_lists": True})) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many([DOCUMENT_IN]) + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + result = runner.invoke( + cli, + args=f"load table {mongodb_url} --treatment={treatment_path}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 1 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo", records=True) + assert results[0] == DOCUMENT_OUT_NO_COMPLEX_LISTS + + # Verify schema in target database. + results = cratedb.database.run_sql("SHOW CREATE TABLE testdrive.demo") + sql = results[0][0] + sql = sqlparse.format(sql) + assert sql.startswith(DOCUMENT_DDL) diff --git a/tests/io/mongodb/test_export.py b/tests/io/mongodb/test_export.py new file mode 100644 index 00000000..7a5d4892 --- /dev/null +++ b/tests/io/mongodb/test_export.py @@ -0,0 +1,38 @@ +import pytest + +pytestmark = pytest.mark.mongodb + + +DATA_IN = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, +} + +DATA_OUT = { + "__id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + "list": [ + {"id": "foo", "value": "something"}, + # FIXME: `$data` is not decoded yet!? + {"id": "bar", "value": "{'$date': '2015-09-24T10:32:42.33Z'}"}, + ], + }, +} + + +def test_extract_value(): + from cratedb_toolkit.io.mongodb.export import CrateDBConverter + + converter = CrateDBConverter() + assert converter.convert(DATA_IN) == DATA_OUT diff --git a/tests/io/mongodb/test_translate.py b/tests/io/mongodb/test_translate.py index ca310d6b..e4effd23 100644 --- a/tests/io/mongodb/test_translate.py +++ b/tests/io/mongodb/test_translate.py @@ -2,10 +2,10 @@ import pytest -from cratedb_toolkit.io.mongodb import translate - pytestmark = pytest.mark.mongodb +from cratedb_toolkit.io.mongodb import translate # noqa: E402 + class TestTranslate(unittest.TestCase): def test_types_translation(self): diff --git a/tests/io/mongodb/test_util.py b/tests/io/mongodb/test_util.py index 0146d0cf..8e9aa4f7 100644 --- a/tests/io/mongodb/test_util.py +++ b/tests/io/mongodb/test_util.py @@ -2,10 +2,10 @@ import pytest -from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names - pytestmark = pytest.mark.mongodb +from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names # noqa: E402 + class TestInputNumberParser(unittest.TestCase): def test_numbers(self):