Skip to content

Commit

Permalink
MongoDB: Accept Zyp Treatments, and ingress pagination / egress batching
Browse files Browse the repository at this point in the history
- Use `--transformation` option for applying special treatments.
  Certain fields should be stored as lists, some need to be ignored for
  now, others need to be treated manually, etc.

- Use pagination on source collection, for creating batches towards
  CrateDB.
  • Loading branch information
amotl committed Sep 10, 2024
1 parent befd579 commit 4297f5d
Show file tree
Hide file tree
Showing 17 changed files with 467 additions and 111 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.
- MongoDB: Sanitize lists of varying objects
- MongoDB: Add `--treatment` option for applying special treatments to certain items
on real-world data
- MongoDB: Use pagination on source collection, for creating batches towards CrateDB

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
5 changes: 4 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(
self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None
self,
resource: InputOutputResource,
target: t.Optional[TableAddress] = None,
transformation: Path = None,
):
"""
Load data into a database table on CrateDB Cloud.
Expand Down
78 changes: 61 additions & 17 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from yarl import URL
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.mongodb.export import extract_value
from cratedb_toolkit.io.mongodb.export import CrateDBConverter
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
Expand All @@ -18,8 +22,13 @@


class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB):
def __init__(self, table_name: str, tm: TransformationManager = None):
"""
Translate a MongoDB document into a CrateDB document.
"""

def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None):
super().__init__(table_name=table_name)
self.converter = converter
self.tm = tm

@staticmethod
Expand All @@ -31,18 +40,22 @@ def get_document_key(record: t.Dict[str, t.Any]) -> str:
"""
return record["_id"]

def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation:
def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation:
"""
Produce CrateDB INSERT SQL statement from MongoDB document.
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);"

# Converge MongoDB document to SQL parameters.
record = extract_value(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters = {"oid": oid, "record": record}
# Converge multiple MongoDB documents into SQL parameters for `executemany` operation.
parameters: t.List[DocumentDict] = []
for document in data:
record = self.converter.convert(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters.append({"oid": oid, "record": record})

return SQLOperation(sql, parameters)

Expand All @@ -60,13 +73,15 @@ def __init__(
cratedb_url: str,
tm: t.Union[TransformationManager, None],
mongodb_limit: int = 0,
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.mongodb_uri = URL(mongodb_url)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
mongodb_url,
document_class=RawBSONDocument,
Expand All @@ -76,11 +91,20 @@ def __init__(
self.mongodb_limit = mongodb_limit
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm)

# Transformation machinery.
transformation = None
if tm:
transformation = tm.project.get(CollectionAddress(container=mongodb_database, name=mongodb_collection))
self.converter = CrateDBConverter(transformation=transformation)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm)

self.on_error = on_error
self.progress = progress
self.debug = debug

self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250))

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
Expand All @@ -90,29 +114,49 @@ def start(self):
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection:
with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm():
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out = 0
records_out: int = 0

for document in self.mongodb_collection.find().limit(self.mongodb_limit):
skip: int = 0
while True:
progress_bar.set_description("ACQUIRE")
# Acquire batch of documents, and convert to SQL operation.
documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size)
try:
operation = self.translator.to_sql(document)
logger.debug("SQL operation: %s", operation)
operation = self.translator.to_sql(list(documents))
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue
logger_on_error(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
break

# When input data is exhausted, stop processing.
progress_bar.set_description("CHECK")
if not operation.parameters:
break

# Submit operation to CrateDB.
progress_bar.set_description("SUBMIT")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise ValueError("Unable to insert one or more records")
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")
logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}")
if self.on_error == "raise":
raise

# Next page.
skip += self.batch_size

progress_bar.close()
connection.commit()
Expand Down
114 changes: 51 additions & 63 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@
"""

import base64
import builtins
import calendar
import logging
import typing as t
from uuid import UUID

import bsonjs
import dateutil.parser as dateparser
import orjson as json
import pymongo.collection
from attr import Factory
from attrs import define
from zyp.model.collection import CollectionTransformation

from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.io.mongodb.util import sanitize_field_names

logger = logging.getLogger(__name__)


def date_converter(value):
if isinstance(value, int):
Expand All @@ -60,81 +65,64 @@ def timestamp_converter(value):
}


def extract_value(value, parent_type=None):
"""
Decode MongoDB Extended JSON.
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
if len(value) == 1:
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]:
decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"])))
return extract_value(decoded, parent_type)
for k, v in value.items():
if k.startswith("$"):
return extract_value(v, k.lstrip("$"))
return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()}
if isinstance(value, list):
if value and isinstance(value[0], dict):
lovos = ListOfVaryingObjectsSanitizer(value)
lovos.apply()

return [extract_value(v, parent_type) for v in value]
if parent_type:
converter = type_converter.get(parent_type)
if converter:
return converter(value)
return value


@define
class ListOfVaryingObjectsSanitizer:
"""
CrateDB can not store lists of varying objects, so normalize them.
"""

data: t.List[t.Dict[str, t.Any]]

def apply(self):
self.apply_rules(self.get_rules(self.type_stats()))
class CrateDBConverter:
transformation: CollectionTransformation = Factory(CollectionTransformation)

def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
return self.extract_value(data)

def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any:
"""
Decode MongoDB Extended JSON.
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
# Custom adjustments to compensate shape anomalies in source data.
self.apply_special_treatments(value)
if len(value) == 1:
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]:
decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"])))
return self.extract_value(decoded, parent_type)
for k, v in value.items():
if k.startswith("$"):
return self.extract_value(v, k.lstrip("$"))
return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()}
if isinstance(value, list):
return [self.extract_value(v, parent_type) for v in value]
if parent_type:
converter = type_converter.get(parent_type)
if converter:
return converter(value)
return value

def type_stats(self) -> t.Dict[str, t.List[str]]:
types: t.Dict[str, t.List[str]] = {}
for item in self.data:
for key, value in item.items():
types.setdefault(key, []).append(type(value).__name__)
return types
def apply_special_treatments(self, value: t.Any):
"""
Apply special treatments to value that can't be described otherwise up until now.
# Ignore certain items including anomalies that are not resolved, yet.
def get_rules(self, all_types):
rules = []
for name, types in all_types.items():
if len(types) > 1:
rules.append({"name": name, "converter": self.get_best_converter(types)})
return rules
TODO: Needs an integration test feeding two records instead of just one.
"""

def apply_rules(self, rules):
for item in self.data:
for rule in rules:
name = rule["name"]
if name in item:
item[name] = rule["converter"](item[name])
if self.transformation is None or self.transformation.treatment is None:
return None

@staticmethod
def get_best_converter(types: t.List[str]) -> t.Callable:
if "str" in types:
return builtins.str
return lambda x: x
return self.transformation.treatment.apply(value)


def convert(d):
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
converter = CrateDBConverter()
newdict = {}
for k, v in sanitize_field_names(d).items():
newdict[k] = extract_value(v)
newdict[k] = converter.convert(v)
return newdict


Expand Down
3 changes: 3 additions & 0 deletions cratedb_toolkit/io/mongodb/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import typing as t

DocumentDict = t.Dict[str, t.Any]
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
import typing as t

from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.util.data_dict import OrderedDictX


Expand All @@ -26,7 +26,7 @@ def parse_input_numbers(s: str):
return options


def sanitize_field_names(data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
def sanitize_field_names(data: DocumentDict) -> DocumentDict:
"""
Rename top-level column names with single leading underscores to double leading underscores.
CrateDB does not accept singe leading underscores, like `_id`.
Expand Down
Loading

0 comments on commit 4297f5d

Please sign in to comment.