From cf8213d8e449818ddd93ae232acc2ade649c4c6c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 9 Sep 2024 12:43:58 +0200 Subject: [PATCH] MongoDB: Unlock importing MongoDB Extended JSON using `file+bson://...` --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 68 ++++++---- cratedb_toolkit/io/mongodb/adapter.py | 134 +++++++++++++++++++ cratedb_toolkit/io/mongodb/api.py | 21 +-- cratedb_toolkit/io/mongodb/copy.py | 62 ++++----- cratedb_toolkit/io/mongodb/model.py | 1 + cratedb_toolkit/io/mongodb/util.py | 22 +++- cratedb_toolkit/model.py | 3 +- cratedb_toolkit/util/database.py | 6 + doc/io/mongodb/loader.md | 160 +++++++++++++++++++---- examples/zyp/zyp-mongodb-json-files.yaml | 26 ++++ pyproject.toml | 1 + tests/io/mongodb/books-canonical.ndjson | 4 + tests/io/mongodb/books-relaxed.ndjson | 4 + tests/io/mongodb/books.bson.gz | Bin 0 -> 2452 bytes tests/io/mongodb/conftest.py | 1 + tests/io/mongodb/test_copy.py | 115 ++++++++++++++++ 17 files changed, 526 insertions(+), 103 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/adapter.py create mode 100644 examples/zyp/zyp-mongodb-json-files.yaml create mode 100644 tests/io/mongodb/books-canonical.ndjson create mode 100644 tests/io/mongodb/books-relaxed.ndjson create mode 100644 tests/io/mongodb/books.bson.gz create mode 100644 tests/io/mongodb/test_copy.py diff --git a/CHANGES.md b/CHANGES.md index 84f29ad9..aa1cd01c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ - MongoDB: Add `--treatment` option for applying special treatments to certain items on real-world data - MongoDB: Use pagination on source collection, for creating batches towards CrateDB +- MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...` ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 4706a132..bb416cb6 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -6,7 +6,7 @@ from abc import abstractmethod from pathlib import Path -from yarl import URL +from boltons.urlutils import URL from cratedb_toolkit.api.guide import GuidingTexts from cratedb_toolkit.cluster.util import get_cluster_info @@ -113,46 +113,66 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf ctk load table influxdb2://example:token@localhost:8086/testdrive/demo ctk load table mongodb://localhost:27017/testdrive/demo """ - source_url = resource.url + source_url_obj = URL(resource.url) target_url = self.address.dburi - source_url_obj = URL(source_url) - if source_url.startswith("dynamodb"): + + if source_url_obj.scheme.startswith("dynamodb"): from cratedb_toolkit.io.dynamodb.api import dynamodb_copy - if not dynamodb_copy(source_url, target_url, progress=True): + if not dynamodb_copy(str(source_url_obj), target_url, progress=True): msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) - elif source_url.startswith("influxdb"): + elif source_url_obj.scheme.startswith("file"): + if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: + mongodb_copy_generic( + str(source_url_obj), + target_url, + transformation=transformation, + progress=True, + ) + + elif source_url_obj.scheme.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy - http_scheme = "http://" - if asbool(source_url_obj.query.get("ssl")): - http_scheme = "https://" - source_url = source_url.replace("influxdb2://", http_scheme) - if not influxdb_copy(source_url, target_url, progress=True): + http_scheme = "http" + if asbool(source_url_obj.query_params.get("ssl")): + http_scheme = "https" + source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme) + if not influxdb_copy(str(source_url_obj), target_url, progress=True): msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) - elif source_url.startswith("mongodb"): - if "+cdc" in source_url: - source_url = source_url.replace("+cdc", "") + + elif source_url_obj.scheme.startswith("mongodb"): + if "+cdc" in source_url_obj.scheme: + source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - mongodb_relay_cdc(source_url, target_url, progress=True) + mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) else: - from cratedb_toolkit.io.mongodb.api import mongodb_copy - - if not mongodb_copy( - source_url, + mongodb_copy_generic( + str(source_url_obj), target_url, transformation=transformation, - limit=int(source_url_obj.query.get("limit", 0)), progress=True, - ): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + ) else: raise NotImplementedError("Importing resource not implemented yet") + + +def mongodb_copy_generic( + source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False +): + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if not mongodb_copy( + source_url, + target_url, + transformation=transformation, + progress=progress, + ): + msg = "Data loading failed" + logger.error(msg) + raise OperationFailed(msg) diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py new file mode 100644 index 00000000..e539e778 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -0,0 +1,134 @@ +import itertools +import logging +import typing as t +from abc import abstractmethod +from functools import cached_property +from pathlib import Path + +import boltons.urlutils +import polars as pl +import pymongo +import yarl +from attrs import define, field +from boltons.urlutils import URL +from bson.raw_bson import RawBSONDocument +from undatum.common.iterable import IterableData + +from cratedb_toolkit.io.mongodb.util import batches +from cratedb_toolkit.model import DatabaseAddress + +logger = logging.getLogger(__name__) + + +@define +class MongoDBAdapterBase: + address: DatabaseAddress + effective_url: URL + database_name: str + collection_name: str + + _custom_query_parameters = ["batch-size", "limit", "offset"] + + @classmethod + def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): + if not isinstance(url, str): + url = str(url) + mongodb_address = DatabaseAddress.from_string(url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + logger.info(f"Collection address: {mongodb_collection_address}") + mongodb_database = mongodb_collection_address.schema + mongodb_collection = mongodb_collection_address.table + for custom_query_parameter in cls._custom_query_parameters: + mongodb_uri.query_params.pop(custom_query_parameter, None) + return cls( + address=mongodb_address, + effective_url=mongodb_uri, + database_name=mongodb_database, + collection_name=mongodb_collection, + ) + + def __attrs_post_init__(self): + self.setup() + + @cached_property + def batch_size(self) -> int: + return int(self.address.uri.query_params.get("batch-size", 500)) + + @cached_property + def limit(self) -> int: + return int(self.address.uri.query_params.get("limit", 0)) + + @cached_property + def offset(self) -> int: + return int(self.address.uri.query_params.get("offset", 0)) + + @abstractmethod + def setup(self): + raise NotImplementedError() + + @abstractmethod + def record_count(self, filter_=None) -> int: + raise NotImplementedError() + + @abstractmethod + def query(self): + raise NotImplementedError() + + +@define +class MongoDBFileAdapter(MongoDBAdapterBase): + _path: Path = field(init=False) + + def setup(self): + self._path = Path(self.address.uri.path) + + def record_count(self, filter_=None) -> int: + """ + https://stackoverflow.com/a/27517681 + """ + f = open(self._path, "rb") + bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None))) + return sum(buf.count(b"\n") for buf in bufgen if buf) + + def query(self): + if not self._path.exists(): + raise FileNotFoundError(f"Resource not found: {self._path}") + if self.offset: + raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + if self._path.suffix in [".ndjson", ".jsonl"]: + data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts() + elif ".bson" in str(self._path): + data = IterableData(str(self._path), options={"format_in": "bson"}).iter() + else: + raise ValueError(f"Unsupported file type: {self._path.suffix}") + return batches(data, self.batch_size) + + +@define +class MongoDBServerAdapter(MongoDBAdapterBase): + _mongodb_client: pymongo.MongoClient = field(init=False) + _mongodb_collection: pymongo.collection.Collection = field(init=False) + + def setup(self): + self._mongodb_client: pymongo.MongoClient = pymongo.MongoClient( + str(self.effective_url), + document_class=RawBSONDocument, + datetime_conversion="DATETIME_AUTO", + ) + self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + + def record_count(self, filter_=None) -> int: + filter_ = filter_ or {} + return self._mongodb_collection.count_documents(filter=filter_) + + def query(self): + data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit) + return batches(data, self.batch_size) + + +def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase: + if mongodb_uri.scheme.startswith("file"): + return MongoDBFileAdapter.from_url(mongodb_uri) + elif mongodb_uri.scheme.startswith("mongodb"): + return MongoDBServerAdapter.from_url(mongodb_uri) + raise ValueError("Unable to create MongoDB adapter") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index e95890c9..35acb47b 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,5 +1,6 @@ import argparse import logging +import typing as t from pathlib import Path from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB @@ -41,12 +42,11 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi # 1. Extract schema from MongoDB collection. logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}") extract_args = argparse.Namespace( - url=str(mongodb_uri), + url=str(mongodb_uri) + f"&limit={limit}", database=mongodb_database, collection=mongodb_collection, scan="partial", transformation=transformation, - limit=limit, ) mongodb_schema = extract(extract_args) count = mongodb_schema[mongodb_collection]["count"] @@ -75,11 +75,10 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}" ) export_args = argparse.Namespace( - url=str(mongodb_uri), + url=str(mongodb_uri) + f"&limit={limit}", database=mongodb_database, collection=mongodb_collection, transformation=transformation, - limit=limit, ) buffer = export(export_args) cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) @@ -87,7 +86,7 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi return True -def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): +def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False): """ Transfer MongoDB collection using translator component. @@ -97,13 +96,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int ctk load table mongodb://localhost:27017/testdrive/demo """ - # Decode database URL. - mongodb_address = DatabaseAddress.from_string(source_url) - mongodb_uri, mongodb_collection_address = mongodb_address.decode() - mongodb_database = mongodb_collection_address.schema - mongodb_collection = mongodb_collection_address.table - - logger.info(f"Invoking MongoDBFullLoad. mongodb_uri={mongodb_uri}") + logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}") # Optionally configure transformations. tm = None @@ -112,9 +105,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int # Invoke `full-load` procedure. mdb_full = MongoDBFullLoad( - mongodb_url=str(mongodb_uri), - mongodb_database=mongodb_database, - mongodb_collection=mongodb_collection, + mongodb_url=source_url, cratedb_url=target_url, tm=tm, progress=progress, diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index bd9c3ff9..f0980af1 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -2,16 +2,16 @@ import logging import typing as t -import pymongo import sqlalchemy as sa -from bson.raw_bson import RawBSONDocument +from boltons.urlutils import URL from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from pymongo.cursor import Cursor from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm -from yarl import URL from zyp.model.collection import CollectionAddress +from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.export import CrateDBConverter from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager @@ -44,6 +44,8 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat """ Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ + if isinstance(data, Cursor): + data = list(data) if not isinstance(data, list): data = [data] @@ -68,34 +70,34 @@ class MongoDBFullLoad: def __init__( self, mongodb_url: str, - mongodb_database: str, - mongodb_collection: str, cratedb_url: str, tm: t.Union[TransformationManager, None], - mongodb_limit: int = 0, - on_error: t.Literal["ignore", "raise"] = "ignore", + on_error: t.Literal["ignore", "raise"] = "raise", progress: bool = False, debug: bool = True, ): + # Decode database URL: MongoDB. + self.mongodb_uri = URL(mongodb_url) + self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) + + # Decode database URL: CrateDB. cratedb_address = DatabaseAddress.from_string(cratedb_url) cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname - self.mongodb_uri = URL(mongodb_url) - self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient( - mongodb_url, - document_class=RawBSONDocument, - datetime_conversion="DATETIME_AUTO", - ) - self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] - self.mongodb_limit = mongodb_limit self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) # Transformation machinery. transformation = None if tm: - transformation = tm.project.get(CollectionAddress(container=mongodb_database, name=mongodb_collection)) + address = CollectionAddress( + container=self.mongodb_adapter.database_name, name=self.mongodb_adapter.collection_name + ) + try: + transformation = tm.project.get(address=address) + except KeyError: + pass self.converter = CrateDBConverter(transformation=transformation) self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm) @@ -103,14 +105,12 @@ def __init__( self.progress = progress self.debug = debug - self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250)) - def start(self): """ Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ - records_in = self.mongodb_collection.count_documents(filter={}) - logger.info(f"Source: MongoDB collection={self.mongodb_collection} count={records_in}") + records_in = self.mongodb_adapter.record_count() + logger.info(f"Source: MongoDB collection={self.mongodb_adapter.collection_name} count={records_in}") logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception @@ -123,26 +123,20 @@ def start(self): progress_bar = tqdm(total=records_in) records_out: int = 0 - skip: int = 0 - while True: + # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. + for documents in self.mongodb_adapter.query(): progress_bar.set_description("ACQUIRE") - # Acquire batch of documents, and convert to SQL operation. - documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size) + try: - operation = self.translator.to_sql(list(documents)) + operation = self.translator.to_sql(documents) except Exception as ex: logger_on_error(f"Computing query failed: {ex}") if self.on_error == "raise": raise - break - - # When input data is exhausted, stop processing. - progress_bar.set_description("CHECK") - if not operation.parameters: - break + continue # Submit operation to CrateDB. - progress_bar.set_description("SUBMIT") + progress_bar.set_description("SUBMIT ") try: result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount @@ -154,9 +148,7 @@ def start(self): logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") if self.on_error == "raise": raise - - # Next page. - skip += self.batch_size + continue progress_bar.close() connection.commit() diff --git a/cratedb_toolkit/io/mongodb/model.py b/cratedb_toolkit/io/mongodb/model.py index 37e62548..559b0ad0 100644 --- a/cratedb_toolkit/io/mongodb/model.py +++ b/cratedb_toolkit/io/mongodb/model.py @@ -1,3 +1,4 @@ import typing as t DocumentDict = t.Dict[str, t.Any] +Documents = t.List[DocumentDict] diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 529e5f6b..0aed69a8 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,9 @@ import re +import typing as t -from cratedb_toolkit.io.mongodb.model import DocumentDict +from pymongo.cursor import Cursor + +from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents from cratedb_toolkit.util.data_dict import OrderedDictX @@ -39,3 +42,20 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict: if name.startswith("_") and not name.startswith("__"): d.rename_key(name, f"_{name}") return d + + +def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]: + """ + Generate batches of documents. + """ + count = 0 + buffer = [] + for item in data: + buffer.append(item) + count += 1 + if count >= batch_size: + yield buffer + buffer = [] + count = 0 + if buffer: + yield buffer diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index c2b3716a..66f443d7 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -70,7 +70,8 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]: database, table = decode_database_table(self.dburi) uri = deepcopy(self.uri) - uri.path = "" + if not uri.scheme.startswith("file"): + uri.path = "" return uri, TableAddress(database, table) diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index fe97b352..ef6af8ee 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -391,6 +391,12 @@ def decode_database_table(url: str) -> t.Tuple[str, str]: table = url_.query_params.get("table") if url_.scheme == "crate" and not database: database = url_.query_params.get("schema") + if database is None and table is None: + if url_.scheme.startswith("file"): + _, database, table = url_.path.rsplit("/", 2) + table, _ = table.split(".", 1) + if database is None and table is None: + raise ValueError("Database and table must be specified") from ex return database, table diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 7242687a..40e736ee 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -2,18 +2,109 @@ # MongoDB Table Loader ## About -Load data from MongoDB into CrateDB using a one-stop command -`ctk load table mongodb://...`, in order to facilitate convenient -data transfers to be used within data pipelines or ad hoc operations. +Load data from MongoDB and its file formats into CrateDB using a one-stop +command `ctk load table`, in order to facilitate convenient data transfers +to be used within data pipelines or ad hoc operations. + +## Coverage +CrateDB Toolkit supports different variants to load MongoDB data from +server instances and filesystems. + +- `mongodb://` + + Connect to MongoDB Community or Enterprise Edition. + +- `mongodb+srv://` + + Connect to MongoDB Atlas. + +- `file+bson://` + + Read [MongoDB Extended JSON] format from filesystem. ## Install ```shell pip install --upgrade 'cratedb-toolkit[mongodb]' ``` -## Example -Import two data points into MongoDB. +## Usage +The MongoDB I/O adapter can process MongoDB data from different sources. +This section enumerates relevant connectivity options on behalf of +concrete usage examples. +### MongoDB Community and Enterprise +Transfer data from MongoDB database/collection into CrateDB schema/table. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table "mongodb://localhost:27017/testdrive/demo" +``` +Query data in CrateDB. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk shell --command "SELECT * FROM testdrive.demo;" +ctk show table "testdrive.demo" +``` + +### MongoDB Atlas +Transfer data from MongoDB Atlas. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker/stocks?batch-size=5000" +``` + +### MongoDB JSON/BSON files +Load data from MongoDB JSON/BSON files, for example produced by the +`mongoexport` or `mongodump` programs. +In order to get hold of a few samples worth of data, the canonical MongoDB C +driver's [libbson test files] includes a few. In this case, let's acquire +the collection at [mongodb-json-files]. +```shell +git clone https://github.com/ozlerhakan/mongodb-json-files.git +``` +```shell +CRATEDB_SQLALCHEMY_BASEURL=crate://crate@localhost:4200/testdrive +ctk load table \ + "file+bson:///path/to/mongodb-json-files/datasets/books.json" \ + --cratedb-sqlalchemy-url="${CRATEDB_SQLALCHEMY_BASEURL}/books" +``` +Address relative and/or compressed BSON files like +`file+bson:./tmp/testdrive/books.bson.gz`. + +Example queries that fit the schema of `books.json`, and more, can be +found at [](#ready-made-queries). + + +## Options + +### Batch Size +The default batch size is 500. You can adjust the value by appending the HTTP +URL query parameter `batch-size` to the source URL, like +`mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`. + +### Offset +Use the HTTP URL query parameter `offset` on the source URL, like +`&offset=42`, in order to start processing at this record from the +beginning. + +### Limit +Use the HTTP URL query parameter `limit` on the source URL, like +`&limit=100`, in order to limit processing to a total number of +records. + +## Zyp Transformations +You can use [Zyp Transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option. + +It is also available for the `migr8 extract` and `migr8 export` commands. +Example transformation files in YAML format can be explored at [examples/zyp]. + + +## Appendix + +### Insert Exercise +Import two data points into MongoDB database `testdrive` and collection `demo`, +using the `mongosh` CLI program. ```shell mongosh mongodb://localhost:27017/testdrive <=3.10.1", "python-bsonjs<0.5", "rich<14,>=3.3.2", + "undatum<1.1", ] pymongo = [ "jessiql==1.0.0rc1", diff --git a/tests/io/mongodb/books-canonical.ndjson b/tests/io/mongodb/books-canonical.ndjson new file mode 100644 index 00000000..322d4cfb --- /dev/null +++ b/tests/io/mongodb/books-canonical.ndjson @@ -0,0 +1,4 @@ +{"_id":{"$numberInt":"1"},"title":"Unlocking Android","isbn":"1933988673","pageCount":{"$numberInt":"416"},"publishedDate":{"$date":{"$numberLong":"1238569200000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson.jpg","shortDescription":"Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout.","longDescription":"Android is an open source mobile phone platform based on the Linux operating system and developed by the Open Handset Alliance, a consortium of over 30 hardware, software and telecom companies that focus on open standards for mobile devices. Led by search giant, Google, Android is designed to deliver a better and more open and cost effective mobile experience. Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout. Based on his mobile development experience and his deep knowledge of the arcane Android technical documentation, the author conveys the know-how you need to develop practical applications that build upon or replace any of Androids features, however small. Unlocking Android: A Developer's Guide prepares the reader to embrace the platform in easy-to-understand language and builds on this foundation with re-usable Java code examples. It is ideal for corporate and hobbyists alike who have an interest, or a mandate, to deliver software functionality for cell phones. WHAT'S INSIDE: * Android's place in the market * Using the Eclipse environment for Android development * The Intents - how and why they are used * Application classes: o Activity o Service o IntentReceiver * User interface design * Using the ContentProvider to manage data * Persisting data with the SQLite database * Networking examples * Telephony applications * Notification methods * OpenGL, animation & multimedia * Sample Applications ","status":"PUBLISH","authors":["W. Frank Ableson","Charlie Collins","Robi Sen"],"categories":["Open Source","Mobile"]} +{"_id":{"$numberInt":"2"},"title":"Android in Action, Second Edition","isbn":"1935182722","pageCount":{"$numberInt":"592"},"publishedDate":{"$date":{"$numberLong":"1294992000000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson2.jpg","shortDescription":"Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","longDescription":"When it comes to mobile apps, Android can do almost anything and with this book, so can you! Android runs on mobile devices ranging from smart phones to tablets to countless special-purpose gadgets. It's the broadest mobile platform available. Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","status":"PUBLISH","authors":["W. Frank Ableson","Robi Sen"],"categories":["Java"]} +{"_id":{"$numberInt":"3"},"title":"Specification by Example","isbn":"1617290084","pageCount":{"$numberInt":"0"},"publishedDate":{"$date":{"$numberLong":"1307084400000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/adzic.jpg","status":"PUBLISH","authors":["Gojko Adzic"],"categories":["Software Engineering"]} +{"_id":{"$numberInt":"4"},"title":"Flex 3 in Action","isbn":"1933988746","pageCount":{"$numberInt":"576"},"publishedDate":{"$date":{"$numberLong":"1233561600000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ahmed.jpg","longDescription":"New web applications require engaging user-friendly interfaces and the cooler, the better. With Flex 3, web developers at any skill level can create high-quality, effective, and interactive Rich Internet Applications (RIAs) quickly and easily. Flex removes the complexity barrier from RIA development by offering sophisticated tools and a straightforward programming language so you can focus on what you want to do instead of how to do it. And now that the major components of Flex are free and open-source, the cost barrier is gone, as well! Flex 3 in Action is an easy-to-follow, hands-on Flex tutorial. Chock-full of examples, this book goes beyond feature coverage and helps you put Flex to work in real day-to-day tasks. You'll quickly master the Flex API and learn to apply the techniques that make your Flex applications stand out from the crowd. Interesting themes, styles, and skins It's in there. Working with databases You got it. Interactive forms and validation You bet. Charting techniques to help you visualize data Bam! The expert authors of Flex 3 in Action have one goal to help you get down to business with Flex 3. Fast. Many Flex books are overwhelming to new users focusing on the complexities of the language and the super-specialized subjects in the Flex eco-system; Flex 3 in Action filters out the noise and dives into the core topics you need every day. Using numerous easy-to-understand examples, Flex 3 in Action gives you a strong foundation that you can build on as the complexity of your projects increases.","status":"PUBLISH","authors":["Tariq Ahmed with Jon Hirschi","Faisal Abid"],"categories":["Internet"]} diff --git a/tests/io/mongodb/books-relaxed.ndjson b/tests/io/mongodb/books-relaxed.ndjson new file mode 100644 index 00000000..45db1f27 --- /dev/null +++ b/tests/io/mongodb/books-relaxed.ndjson @@ -0,0 +1,4 @@ +{"_id":1,"title":"Unlocking Android","isbn":"1933988673","pageCount":416,"publishedDate":{"$date":"2009-04-01T07:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson.jpg","shortDescription":"Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout.","longDescription":"Android is an open source mobile phone platform based on the Linux operating system and developed by the Open Handset Alliance, a consortium of over 30 hardware, software and telecom companies that focus on open standards for mobile devices. Led by search giant, Google, Android is designed to deliver a better and more open and cost effective mobile experience. Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout. Based on his mobile development experience and his deep knowledge of the arcane Android technical documentation, the author conveys the know-how you need to develop practical applications that build upon or replace any of Androids features, however small. Unlocking Android: A Developer's Guide prepares the reader to embrace the platform in easy-to-understand language and builds on this foundation with re-usable Java code examples. It is ideal for corporate and hobbyists alike who have an interest, or a mandate, to deliver software functionality for cell phones. WHAT'S INSIDE: * Android's place in the market * Using the Eclipse environment for Android development * The Intents - how and why they are used * Application classes: o Activity o Service o IntentReceiver * User interface design * Using the ContentProvider to manage data * Persisting data with the SQLite database * Networking examples * Telephony applications * Notification methods * OpenGL, animation & multimedia * Sample Applications ","status":"PUBLISH","authors":["W. Frank Ableson","Charlie Collins","Robi Sen"],"categories":["Open Source","Mobile"]} +{"_id":2,"title":"Android in Action, Second Edition","isbn":"1935182722","pageCount":592,"publishedDate":{"$date":"2011-01-14T08:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson2.jpg","shortDescription":"Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","longDescription":"When it comes to mobile apps, Android can do almost anything and with this book, so can you! Android runs on mobile devices ranging from smart phones to tablets to countless special-purpose gadgets. It's the broadest mobile platform available. Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","status":"PUBLISH","authors":["W. Frank Ableson","Robi Sen"],"categories":["Java"]} +{"_id":3,"title":"Specification by Example","isbn":"1617290084","pageCount":0,"publishedDate":{"$date":"2011-06-03T07:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/adzic.jpg","status":"PUBLISH","authors":["Gojko Adzic"],"categories":["Software Engineering"]} +{"_id":4,"title":"Flex 3 in Action","isbn":"1933988746","pageCount":576,"publishedDate":{"$date":"2009-02-02T08:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ahmed.jpg","longDescription":"New web applications require engaging user-friendly interfaces and the cooler, the better. With Flex 3, web developers at any skill level can create high-quality, effective, and interactive Rich Internet Applications (RIAs) quickly and easily. Flex removes the complexity barrier from RIA development by offering sophisticated tools and a straightforward programming language so you can focus on what you want to do instead of how to do it. And now that the major components of Flex are free and open-source, the cost barrier is gone, as well! Flex 3 in Action is an easy-to-follow, hands-on Flex tutorial. Chock-full of examples, this book goes beyond feature coverage and helps you put Flex to work in real day-to-day tasks. You'll quickly master the Flex API and learn to apply the techniques that make your Flex applications stand out from the crowd. Interesting themes, styles, and skins It's in there. Working with databases You got it. Interactive forms and validation You bet. Charting techniques to help you visualize data Bam! The expert authors of Flex 3 in Action have one goal to help you get down to business with Flex 3. Fast. Many Flex books are overwhelming to new users focusing on the complexities of the language and the super-specialized subjects in the Flex eco-system; Flex 3 in Action filters out the noise and dives into the core topics you need every day. Using numerous easy-to-understand examples, Flex 3 in Action gives you a strong foundation that you can build on as the complexity of your projects increases.","status":"PUBLISH","authors":["Tariq Ahmed with Jon Hirschi","Faisal Abid"],"categories":["Internet"]} diff --git a/tests/io/mongodb/books.bson.gz b/tests/io/mongodb/books.bson.gz new file mode 100644 index 0000000000000000000000000000000000000000..d8fc1bd4daab66770180026b99a08a80dfafc197 GIT binary patch literal 2452 zcmV;F32XKriwFpkecxsP17dG)YjZAQb8l_{?O0uJ97z>zo4^vmLqJ*yBt9yU*sw4& zN$kxhJnSS+VkdU++71>^P}5!0U77BxPF1zXlP4s^1Al=h;J5Hs_#d2mt9yDT4hEz} z;$i(VJ>}|K_v74iPtD)Ho+Q_PtFvS?Ns`T=L#dJ*cpRI|S)B=O#5fJwnt?c0sWq8+ znrZr;Y-&F-$+xig&cp4ycOGor-`Yy9)iPI4Y-2)l2#Z(iW>RWjsBA|Dl_Y=v(;tQ# zbWYJ!6C-u`+Lg%xOcWv1{?V;letRS<`OX?S_oLKSx5h8`#xI`SJ=!1d?7u$v_0i$8 zoi9ccYiC2YG1L_f^ta@sRNk7c(VIF?Hhp1T*ik-px@KShy73=GB(xV&;LzHsLK&~m zln70*PRmkEtrMMvOr5FH*2?*jIFU1La znW{`s1fp(&=QrU}D3r(?j&wJ@@Ja?DvyjCAbX7v0p&`iT2#$-d;QId#R}aS< zpL`Sab@=-6{@(F!az!=)+~boWvFYvPJNP&qiD$skOpGH(C7XAWt5`om`B74nBv(=j z9EDS!-@uDt<>Unbfsa34Z5eu_XXx+Z@wF*!%GGQ`T4&_#hYudyyT854$bZ?mz{tP< z^xf~o1xDWg@)-GF5b}>UJ`lhoF%~-=SW%$ojn2>?&Yt1ZGkCOD15uzTen=f)0Gfuo zgAq?a=}=EL04!Md3!tdtO17;)paVK=NgdG{-$n-AdSp_jJODz%mfceIFWJfoZuv1|{0Al+# zD&jI^K@qu513j_Xph{3y#Uv$-<_JL*Q=2xNmlyz2o#B1YC0r-qk=Tzufnrb#k;9KL z5YMg6OV!gq(gA?z++Z`BK2s$z6TF?MKm!p?d87}Y_59a#;hu1h?SsTPKDnlv>5v7ZkR59C?>%1AXS-K8C!dSx&>afA|bWVDKvOvk?dr$(4Gc#sb@;e3kxux(LZSl zDg=S{v(#D@b!`CHR~l~F{L{uT=wT%+;yF+lt#;zno$iiLZXS!hSI2uhPalc)?`JFH zIhLc<8jxNyFj2edHBWt7f0~xMhRjmtOgn2BcpuO{yxzWig4gyCi$Rn_Dryvo1ry&w zkoyfGyK|_=jYv!Bz4A+U>Cdt}JEJ()R*#h<3R_={=N_q4alZO^@P+d`rI;g^T@3b# z<<}1*-!O(y1%d@CA-nyD01Zk=FY$6Ke<7j1*auheBF%+v|0@;d*3DMc%`#QsZ>rg1 zy?eSBZP3#$HI+hRX3PK4{CK`kiVSjqH+&+hrVP4LnO?ej%*U;!7zQpERy&sqt7}+) zf@xH0N*Csq@yX`aC3@?VXz$zj;F)*KqXf2@pw#ff0+af zs;4o9t6J>do%>r4Z{L3K*{T+!zxTD+PYzCQU(jMNz6>pvz0+x{#4gG0_xfBsw{K=Z zHodo=uj_Ho#}s?l?N8~Fq(Js2$8z~mo2n~)s;FGgN_8%_`}@pAxqAPzd#fq>`~yEA ze4gC<@&6`81&|q2^uF-=0fg5pH5YR=S?>d<-Zt71`Q$Qh?+k7@rJp@|t3dpqC2__~F}zX?D=-nL&4L zQ9S;@-$xqiiP*;(5_GFk{PbvV>^~Ir+FSrj#V$VG($A(cB>RrJc%;ZZ(5y8 z)=6~tC#i+eO0%0dym+cM!E0=CIFh1O4 z+sivO^+3EOM5*?+>4beHp&4+xYxB7ZvnXVs93x^lqt4DVRs(yjwrD3^MPai}^%2T| zKaB?d4J9f9M-u(Com)KOy19mZhzPj~T>fagZ3B{65q)RqnN~INcJw0l(&&gztVCux zt(?^}?Wx(`brbiotWbGU6s&ebt517Kt)hX26kX49i#muk!#TP#vvbbxgnrbZ-;(A% zFQ6Au7S7cH@d2+ACOo5-FffNPZXK8~perIHVjk`qT5qp`%O0ltSS!KT^d&9$2H_0b zAF~iIO3+N+fL*%$M-Onj8^+tqFD}MA)nyix%*6N4{(epn4fEN_v2`X SgiJ@{c>E`N1@^SQ7XSbvsH~p= literal 0 HcmV?d00001 diff --git a/tests/io/mongodb/conftest.py b/tests/io/mongodb/conftest.py index 419b98e6..0e38c79f 100644 --- a/tests/io/mongodb/conftest.py +++ b/tests/io/mongodb/conftest.py @@ -11,6 +11,7 @@ pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") pytest.importorskip("rich", reason="Skipping tests because rich is not installed") +pytest.importorskip("undatum", reason="Skipping tests because undatum is not installed") # Define databases to be deleted before running each test case. diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py new file mode 100644 index 00000000..fad770b9 --- /dev/null +++ b/tests/io/mongodb/test_copy.py @@ -0,0 +1,115 @@ +import pytest + +from cratedb_toolkit.io.mongodb.api import mongodb_copy +from tests.conftest import check_sqlalchemy2 + +pytestmark = pytest.mark.mongodb + + +@pytest.fixture(scope="module", autouse=True) +def check_prerequisites(): + """ + This subsystem needs SQLAlchemy 2.x. + """ + check_sqlalchemy2() + + +def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/books-relaxed.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 4 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "bigint" + + +def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/books-canonical.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 4 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + + # FIXME: Why does the "canonical format" yield worse results? + assert timestamp_type == "text" + + +def test_mongodb_copy_filesystem_bson(caplog, cratedb): + """ + Verify MongoDB BSON -> CrateDB data transfer. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/books.bson.gz" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 4 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "bigint"