Skip to content

Commit

Permalink
MongoDB: Make ctk load table use the data OBJECT(DYNAMIC) strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 3, 2024
1 parent b337e42 commit 453bd3b
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
based on the first 10,000 documents.
- MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL.
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
42 changes: 41 additions & 1 deletion cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
from pathlib import Path

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
from cratedb_toolkit.util.database import DatabaseAdapter

logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
"""
Transfer MongoDB collection using migr8.
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
Expand Down Expand Up @@ -83,6 +87,42 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
return True


def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
"""
Transfer MongoDB collection using translator component.
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

logger.info(f"Invoking MongoDBFullLoad. mongodb_uri={mongodb_uri}")

# Optionally configure transformations.
tm = None
if transformation:
tm = TransformationManager(path=transformation)

Check warning on line 111 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L111

Added line #L111 was not covered by tests

# Invoke `full-load` procedure.
mdb_full = MongoDBFullLoad(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_url=target_url,
tm=tm,
progress=progress,
)
mdb_full.start()
return True


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
Synopsis
Expand Down
122 changes: 122 additions & 0 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# ruff: noqa: S608
import logging
import typing as t

import pymongo
import sqlalchemy as sa
from bson.raw_bson import RawBSONDocument
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from tqdm import tqdm

from cratedb_toolkit.io.mongodb.export import extract_value
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB):
def __init__(self, table_name: str, tm: TransformationManager = None):
super().__init__(table_name=table_name)
self.tm = tm

@staticmethod
def get_document_key(record: t.Dict[str, t.Any]) -> str:
"""
Return value of document key (MongoDB document OID) from CDC record.
"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}
"""
return record["_id"]

def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation:
"""
Produce CrateDB INSERT SQL statement from MongoDB document.
"""

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);"

# Converge MongoDB document to SQL parameters.
record = extract_value(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters = {"oid": oid, "record": record}

return SQLOperation(sql, parameters)


class MongoDBFullLoad:
"""
Copy MongoDB collection into CrateDB table.
"""

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_url: str,
tm: t.Union[TransformationManager, None],
mongodb_limit: int = 0,
progress: bool = False,
debug: bool = True,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
mongodb_url,
document_class=RawBSONDocument,
datetime_conversion="DATETIME_AUTO",
)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.mongodb_limit = mongodb_limit
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm)

self.progress = progress
self.debug = debug

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.mongodb_collection.count_documents(filter={})
logger.info(f"Source: MongoDB collection={self.mongodb_collection} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out = 0

for document in self.mongodb_collection.find().limit(self.mongodb_limit):
result_size = 1

try:
operation = self.translator.to_sql(document)
logger.info("operation: %s", operation)
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue

Check warning on line 110 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L108-L110

Added lines #L108 - L110 were not covered by tests
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")

Check warning on line 116 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L115-L116

Added lines #L115 - L116 were not covered by tests

progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:
logger.warning("No data has been copied")

Check warning on line 122 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L122

Added line #L122 was not covered by tests
2 changes: 1 addition & 1 deletion cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def extract_value(value, parent_type=None):
if isinstance(value, dict):
if len(value) == 1:
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]:
decoded = UUID(bytes=base64.b64decode(value["$binary"]["base64"]))
decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"])))
return extract_value(decoded, parent_type)
for k, v in value.items():
if k.startswith("$"):
Expand Down
2 changes: 1 addition & 1 deletion examples/zyp/zyp-int64-to-timestamp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ collections:
name: demo
schema:
rules:
- pointer: /timestamp
- pointer: /data/timestamp
type: DATETIME
49 changes: 18 additions & 31 deletions tests/io/mongodb/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,41 +52,28 @@ def test_version():
},
}
DOCUMENT_OUT = {
"__id": mock.ANY,
"id": "d575540f-759d-4653-a4c4-4a9e410f1aa1",
"value": {
"name": "foobar",
"active": True,
"created": 1592579033000,
"timestamp": 1455141600000,
"list_date": [1592579033000, 1592579033000],
"list_empty": [],
"list_float": [42.42, 43.43],
"list_integer": [42, 43],
"list_object": [{"foo": "bar"}, {"baz": "qux"}],
"list_string": ["foo", "bar"],
"oid": mock.ANY,
"data": {
"_id": mock.ANY,
"id": "d575540f-759d-4653-a4c4-4a9e410f1aa1",
"value": {
"name": "foobar",
"active": True,
"created": 1592579033000,
"timestamp": 1455141600000,
"list_date": [1592579033000, 1592579033000],
"list_empty": [],
"list_float": [42.42, 43.43],
"list_integer": [42, 43],
"list_object": [{"foo": "bar"}, {"baz": "qux"}],
"list_string": ["foo", "bar"],
},
},
}
DOCUMENT_DDL = """
CREATE TABLE IF NOT EXISTS "testdrive"."demo" (
"__id" TEXT,
"id" TEXT,
"value" OBJECT(DYNAMIC) AS (
"name" TEXT,
"active" BOOLEAN,
"created" TIMESTAMP WITH TIME ZONE,
"timestamp" TIMESTAMP WITH TIME ZONE,
"list_date" ARRAY(TIMESTAMP WITH TIME ZONE),
"list_empty" ARRAY(TEXT),
"list_float" ARRAY(REAL),
"list_integer" ARRAY(INTEGER),
"list_object" ARRAY(OBJECT(DYNAMIC) AS (
"foo" TEXT,
"baz" TEXT
)),
"list_string" ARRAY(TEXT)
)
)""".lstrip()
"oid" TEXT,
"data" OBJECT(DYNAMIC)""".lstrip()


def test_mongodb_load_table_basic(caplog, cratedb, mongodb):
Expand Down
12 changes: 7 additions & 5 deletions tests/io/mongodb/test_transformation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pathlib import Path

import pytest
from sqlalchemy import TIMESTAMP

from tests.conftest import check_sqlalchemy2

Expand All @@ -22,6 +21,7 @@ def check_prerequisites():
check_sqlalchemy2()


@pytest.mark.skip("Wishful thinking with single column strategy")
def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer with transformation.
Expand All @@ -45,9 +45,11 @@ def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb):
# Verify data in target database.
cratedb.database.refresh_table("testdrive.demo")
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True)
assert results[0]["timestamp"] == 1563051934000
assert results[0]["data"]["timestamp"] == 1563051934000

# Verify schema in target database.
columns = cratedb.database.describe_table_columns("testdrive.demo")
timestamp_type = columns[3]["type"]
assert isinstance(timestamp_type, TIMESTAMP)
type_result = cratedb.database.run_sql(
"SELECT pg_typeof(data['timestamp']) AS type FROM testdrive.demo;", records=True
)
timestamp_type = type_result[0]["type"]
assert timestamp_type == "TIMESTAMP WITH TIME ZONE"

0 comments on commit 453bd3b

Please sign in to comment.