diff --git a/CHANGES.md b/CHANGES.md index f92c2cc4..5f407ef5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- MongoDB: Fix and verify Zyp transformations ## 2024/08/21 v0.0.18 - Dependencies: Unpin commons-codec, to always use the latest version diff --git a/cratedb_toolkit/io/mongodb/transform.py b/cratedb_toolkit/io/mongodb/transform.py index 84047037..13e27d5b 100644 --- a/cratedb_toolkit/io/mongodb/transform.py +++ b/cratedb_toolkit/io/mongodb/transform.py @@ -36,7 +36,7 @@ def apply_type_overrides(self, database_name: str, collection_name: str, collect for rule in transformation.schema.rules: pointer = JsonPointer(f"/document{rule.pointer}/types") type_stats = pointer.resolve(collection_schema) - type_stats[rule.type] = 1e10 + type_stats[rule.type] = {"count": int(9e10)} def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]): if not self.active: @@ -46,4 +46,6 @@ def apply_transformations(self, database_name: str, collection_name: str, data: transformation: CollectionTransformation = self.project.get(address) except KeyError: return data - return transformation.bucket.apply(data) + if transformation.bucket: + return transformation.bucket.apply(data) + return data diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index f5546dbc..c2b3716a 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -4,8 +4,6 @@ from boltons.urlutils import URL -from cratedb_toolkit.util.database import DatabaseAdapter, decode_database_table - @dataclasses.dataclass class DatabaseAddress: @@ -68,6 +66,8 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]: """ Decode database and table names, and sanitize database URI. """ + from cratedb_toolkit.util.database import decode_database_table + database, table = decode_database_table(self.dburi) uri = deepcopy(self.uri) uri.path = "" @@ -88,8 +88,14 @@ def fullname(self): """ Return a full-qualified quoted table identifier. """ + from cratedb_toolkit.util import DatabaseAdapter + return DatabaseAdapter.quote_relation_name(f"{self.schema}.{self.table}") + @classmethod + def from_string(cls, table_name_full: str) -> "TableAddress": + return TableAddress(*table_name_full.split(".")) + @dataclasses.dataclass class ClusterInformation: diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index f53d1c76..fe97b352 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -13,6 +13,7 @@ from sqlalchemy.sql.elements import AsBoolean from sqlalchemy_cratedb.dialect import CrateDialect +from cratedb_toolkit.model import TableAddress from cratedb_toolkit.util.data import str_contains try: @@ -349,6 +350,14 @@ def import_csv_dask( parallel=True, ) + def describe_table_columns(self, table_name: str): + """ + Introspect table schema returning defined columns and their types. + """ + inspector = sa.inspect(self.engine) + table_address = TableAddress.from_string(table_name) + return inspector.get_columns(table_name=t.cast(str, table_address.table), schema=table_address.schema) + def sa_is_empty(thing): """ diff --git a/examples/zyp/zyp-int64-to-timestamp.yaml b/examples/zyp/zyp-int64-to-timestamp.yaml new file mode 100644 index 00000000..4b4b867c --- /dev/null +++ b/examples/zyp/zyp-int64-to-timestamp.yaml @@ -0,0 +1,17 @@ +# Zyp transformation defining a schema override on a top-level column. + +# Timestamps in Unixtime/Epoch format are sometimes stored as +# int64 / BIGINT. This transformation defines an adjustment to +# make the target schema use a native datetime/timestamp column. +--- +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive + name: demo + schema: + rules: + - pointer: /timestamp + type: DATETIME diff --git a/examples/zyp-transformation.yaml b/examples/zyp/zyp-transformation.yaml similarity index 100% rename from examples/zyp-transformation.yaml rename to examples/zyp/zyp-transformation.yaml diff --git a/pyproject.toml b/pyproject.toml index 59c96270..60694d02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,7 +158,7 @@ kinesis = [ "lorrystream[carabas]", ] mongodb = [ - "commons-codec[mongodb,zyp]", + "commons-codec[mongodb,zyp]>=0.0.4", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1", diff --git a/tests/io/mongodb/test_transformation.py b/tests/io/mongodb/test_transformation.py new file mode 100644 index 00000000..d360c7dd --- /dev/null +++ b/tests/io/mongodb/test_transformation.py @@ -0,0 +1,52 @@ +from pathlib import Path + +import pytest +from sqlalchemy import TIMESTAMP + +from tests.conftest import check_sqlalchemy2 + +pytestmark = pytest.mark.mongodb + +pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") +pytest.importorskip("rich", reason="Skipping tests because rich is not installed") + +from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402 + + +@pytest.fixture(scope="module", autouse=True) +def check_prerequisites(): + """ + This subsystem needs SQLAlchemy 2.x. + """ + check_sqlalchemy2() + + +def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer with transformation. + """ + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_one({"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000}) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + transformation=Path("examples/zyp/zyp-int64-to-timestamp.yaml"), + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0]["timestamp"] == 1563051934000 + + # Verify schema in target database. + columns = cratedb.database.describe_table_columns("testdrive.demo") + timestamp_type = columns[3]["type"] + assert isinstance(timestamp_type, TIMESTAMP)