Skip to content

Commit

Permalink
MongoDB: --treatment option, and ingress pagination / egress batching
Browse files Browse the repository at this point in the history
- Add `--treatment` option for applying special treatments
  Certain fields should be stored as lists, some need to be ignored for
  now, others need to be treated manually.

- Use pagination on source collection, for creating batches towards
  CrateDB.
  • Loading branch information
amotl committed Sep 5, 2024
1 parent 59e9c1d commit 6ec4aab
Show file tree
Hide file tree
Showing 17 changed files with 412 additions and 74 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.
- MongoDB: Sanitize lists of varying objects
- MongoDB: Add `--treatment` option for applying special treatments to certain items
on real-world data
- MongoDB: Use pagination on source collection, for creating batches towards CrateDB

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
13 changes: 10 additions & 3 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path, treatment: Path):
raise NotImplementedError("Child class needs to implement this method")


Expand All @@ -37,7 +37,11 @@ def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(
self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None
self,
resource: InputOutputResource,
target: t.Optional[TableAddress] = None,
transformation: Path = None,
treatment: Path = None,
):
"""
Load data into a database table on CrateDB Cloud.
Expand Down Expand Up @@ -99,7 +103,9 @@ class StandaloneCluster(ClusterBase):
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None):
def load_table(
self, resource: InputOutputResource, target: TableAddress, transformation: Path = None, treatment: Path = None
):
"""
Load data into a database table on a standalone CrateDB Server.
Expand Down Expand Up @@ -145,6 +151,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
source_url,
target_url,
transformation=transformation,
treatment=treatment,
limit=int(source_url_obj.query.get("limit", 0)),
progress=True,
):
Expand Down
4 changes: 3 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
@click.option("--treatment", type=Path, required=False, help="Path to treatment description file")
@click.pass_context
def load_table(
ctx: click.Context,
Expand All @@ -49,6 +50,7 @@ def load_table(
format_: str,
compression: str,
transformation: Path,
treatment: Path,
):
"""
Import data into CrateDB and CrateDB Cloud clusters.
Expand Down Expand Up @@ -85,4 +87,4 @@ def load_table(
cluster = StandaloneCluster(address=address)
else:
raise NotImplementedError("Unable to select backend")
return cluster.load_table(resource=resource, target=target, transformation=transformation)
return cluster.load_table(resource=resource, target=target, transformation=transformation, treatment=treatment)
9 changes: 8 additions & 1 deletion cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.io.mongodb.model import Treatment
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
Expand Down Expand Up @@ -87,7 +88,9 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi
return True


def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
def mongodb_copy(
source_url, target_url, transformation: Path = None, treatment: Path = None, limit: int = 0, progress: bool = False
):
"""
Transfer MongoDB collection using translator component.
Expand All @@ -109,6 +112,9 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
tm = None
if transformation:
tm = TransformationManager(path=transformation)

Check warning on line 114 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L114

Added line #L114 was not covered by tests
tt = None
if treatment:
tt = Treatment.from_yaml(treatment.read_text())

# Invoke `full-load` procedure.
mdb_full = MongoDBFullLoad(
Expand All @@ -117,6 +123,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
mongodb_collection=mongodb_collection,
cratedb_url=target_url,
tm=tm,
treatment=tt,
progress=progress,
)
mdb_full.start()
Expand Down
79 changes: 62 additions & 17 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from yarl import URL

from cratedb_toolkit.io.mongodb.export import extract_value
from cratedb_toolkit.io.mongodb.export import CrateDBConverter, CrateDBConverterConfig
from cratedb_toolkit.io.mongodb.model import DocumentDict, Treatment
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
Expand All @@ -18,8 +21,13 @@


class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB):
def __init__(self, table_name: str, tm: TransformationManager = None):
"""
Translate a MongoDB document into a CrateDB document.
"""

def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None):
super().__init__(table_name=table_name)
self.converter = converter
self.tm = tm

@staticmethod
Expand All @@ -31,18 +39,22 @@ def get_document_key(record: t.Dict[str, t.Any]) -> str:
"""
return record["_id"]

def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation:
def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation:
"""
Produce CrateDB INSERT SQL statement from MongoDB document.
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if not isinstance(data, list):
data = [data]

Check warning on line 47 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L47

Added line #L47 was not covered by tests

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);"

# Converge MongoDB document to SQL parameters.
record = extract_value(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters = {"oid": oid, "record": record}
# Converge multiple MongoDB documents into SQL parameters for `executemany` operation.
parameters: t.List[DocumentDict] = []
for document in data:
record = self.converter.convert(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters.append({"oid": oid, "record": record})

return SQLOperation(sql, parameters)

Expand All @@ -59,14 +71,17 @@ def __init__(
mongodb_collection: str,
cratedb_url: str,
tm: t.Union[TransformationManager, None],
treatment: t.Union[Treatment, None],
mongodb_limit: int = 0,
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.mongodb_uri = URL(mongodb_url)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
mongodb_url,
document_class=RawBSONDocument,
Expand All @@ -76,11 +91,21 @@ def __init__(
self.mongodb_limit = mongodb_limit
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm)

# Transformation machinery.
if treatment:
converter_config = CrateDBConverterConfig(treatment=treatment)
self.converter = CrateDBConverter(config=converter_config)
else:
self.converter = CrateDBConverter()
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm)

self.on_error = on_error
self.progress = progress
self.debug = debug

self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250))

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
Expand All @@ -90,29 +115,49 @@ def start(self):
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection:
with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm():
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out = 0
records_out: int = 0

for document in self.mongodb_collection.find().limit(self.mongodb_limit):
skip: int = 0
while True:
progress_bar.set_description("ACQUIRE")
# Acquire batch of documents, and convert to SQL operation.
documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size)
try:
operation = self.translator.to_sql(document)
logger.debug("SQL operation: %s", operation)
operation = self.translator.to_sql(list(documents))
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue
logger_on_error(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
break

Check warning on line 138 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L134-L138

Added lines #L134 - L138 were not covered by tests

# When input data is exhausted, stop processing.
progress_bar.set_description("CHECK")
if not operation.parameters:
break

# Submit operation to CrateDB.
progress_bar.set_description("SUBMIT")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise ValueError("Unable to insert one or more records")

Check warning on line 151 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L151

Added line #L151 was not covered by tests
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")
logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}")
if self.on_error == "raise":
raise

Check warning on line 157 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L154-L157

Added lines #L154 - L157 were not covered by tests

# Next page.
skip += self.batch_size

progress_bar.close()
connection.commit()
Expand Down
Loading

0 comments on commit 6ec4aab

Please sign in to comment.