Skip to content

Commit

Permalink
MongoDB: Fix and verify Zyp transformations
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 26, 2024
1 parent 0360b11 commit 866f182
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- MongoDB: Fix and verify Zyp transformations

## 2024/08/21 v0.0.18
- Dependencies: Unpin commons-codec, to always use the latest version
Expand Down
6 changes: 4 additions & 2 deletions cratedb_toolkit/io/mongodb/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def apply_type_overrides(self, database_name: str, collection_name: str, collect
for rule in transformation.schema.rules:
pointer = JsonPointer(f"/document{rule.pointer}/types")
type_stats = pointer.resolve(collection_schema)
type_stats[rule.type] = 1e10
type_stats[rule.type] = {"count": int(9e10)}

def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]):
if not self.active:
Expand All @@ -46,4 +46,6 @@ def apply_transformations(self, database_name: str, collection_name: str, data:
transformation: CollectionTransformation = self.project.get(address)
except KeyError:
return data
return transformation.bucket.apply(data)
if transformation.bucket:
return transformation.bucket.apply(data)
return data
10 changes: 8 additions & 2 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from boltons.urlutils import URL

from cratedb_toolkit.util.database import DatabaseAdapter, decode_database_table


@dataclasses.dataclass
class DatabaseAddress:
Expand Down Expand Up @@ -68,6 +66,8 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]:
"""
Decode database and table names, and sanitize database URI.
"""
from cratedb_toolkit.util.database import decode_database_table

database, table = decode_database_table(self.dburi)
uri = deepcopy(self.uri)
uri.path = ""
Expand All @@ -88,8 +88,14 @@ def fullname(self):
"""
Return a full-qualified quoted table identifier.
"""
from cratedb_toolkit.util import DatabaseAdapter

return DatabaseAdapter.quote_relation_name(f"{self.schema}.{self.table}")

@classmethod
def from_string(cls, table_name_full: str) -> "TableAddress":
return TableAddress(*table_name_full.split("."))


@dataclasses.dataclass
class ClusterInformation:
Expand Down
9 changes: 9 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sqlalchemy.sql.elements import AsBoolean
from sqlalchemy_cratedb.dialect import CrateDialect

from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util.data import str_contains

try:
Expand Down Expand Up @@ -349,6 +350,14 @@ def import_csv_dask(
parallel=True,
)

def describe_table_columns(self, table_name: str):
"""
Introspect table schema returning defined columns and their types.
"""
inspector = sa.inspect(self.engine)
table_address = TableAddress.from_string(table_name)
return inspector.get_columns(table_name=t.cast(str, table_address.table), schema=table_address.schema)


def sa_is_empty(thing):
"""
Expand Down
17 changes: 17 additions & 0 deletions examples/zyp/zyp-int64-to-timestamp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Zyp transformation defining a schema override on a top-level column.

# Timestamps in Unixtime/Epoch format are sometimes stored as
# int64 / BIGINT. This transformation defines an adjustment to
# make the target schema use a native datetime/timestamp column.
---
meta:
type: zyp-project
version: 1
collections:
- address:
container: testdrive
name: demo
schema:
rules:
- pointer: /timestamp
type: DATETIME
File renamed without changes.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ kinesis = [
"lorrystream[carabas]",
]
mongodb = [
"commons-codec[mongodb,zyp]",
"commons-codec[mongodb,zyp]>=0.0.4",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
52 changes: 52 additions & 0 deletions tests/io/mongodb/test_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pathlib import Path

import pytest
from sqlalchemy import TIMESTAMP

from tests.conftest import check_sqlalchemy2

pytestmark = pytest.mark.mongodb

pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed")
pytest.importorskip("rich", reason="Skipping tests because rich is not installed")

from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402


@pytest.fixture(scope="module", autouse=True)
def check_prerequisites():
"""
This subsystem needs SQLAlchemy 2.x.
"""
check_sqlalchemy2()


def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer with transformation.
"""
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo"

# Populate source database.
client: pymongo.MongoClient = mongodb.get_connection_client()
testdrive = client.get_database("testdrive")
demo = testdrive.create_collection("demo")
demo.insert_one({"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000})

# Run transfer command.
mongodb_copy(
mongodb_url,
cratedb_url,
transformation=Path("examples/zyp/zyp-int64-to-timestamp.yaml"),
)

# Verify data in target database.
cratedb.database.refresh_table("testdrive.demo")
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True)
assert results[0]["timestamp"] == 1563051934000

# Verify schema in target database.
columns = cratedb.database.describe_table_columns("testdrive.demo")
timestamp_type = columns[3]["type"]
assert isinstance(timestamp_type, TIMESTAMP)

0 comments on commit 866f182

Please sign in to comment.