diff --git a/CHANGES.md b/CHANGES.md index d59d899..e586e13 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- MongoDB: Added Zyp transformations to the CDC subsystem, + making it more symmetric to the full-load procedure. ## 2024/10/09 v0.0.28 - IO: Improved `BulkProcessor` when running per-record operations by diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 078ac02..e51f2e3 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -156,7 +156,11 @@ def load_table( from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) + return mongodb_relay_cdc( + source_url_obj, + target_url, + transformation=transformation, + ) else: from cratedb_toolkit.io.mongodb.api import mongodb_copy diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index fd7b4b0..acb8fc4 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -86,6 +86,10 @@ def record_count(self, filter_=None) -> int: def query(self): raise NotImplementedError() + @abstractmethod + def subscribe(self): + raise NotImplementedError() + @define class MongoDBFilesystemAdapter(MongoDBAdapterBase): @@ -122,6 +126,9 @@ def query(self): raise ValueError(f"Unsupported file type: {self._path.suffix}") return batches(data, self.batch_size) + def subscribe(self): + raise NotImplementedError("Subscribing to a change stream is not supported by filesystem adapter") + @define class MongoDBResourceAdapter(MongoDBAdapterBase): @@ -153,6 +160,9 @@ def query(self): raise ValueError(f"Unsupported file type: {self._url}") return batches(data, self.batch_size) + def subscribe(self): + raise NotImplementedError("HTTP+BSON loader does not support subscribing to a change stream") + @define class MongoDBServerAdapter(MongoDBAdapterBase): @@ -193,6 +203,9 @@ def query(self): ) return batches(data, self.batch_size) + def subscribe(self): + return self._mongodb_collection.watch(full_document="updateLookup") + def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase: if mongodb_uri.scheme.startswith("file"): diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 1b946f3..dd68abf 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -175,7 +175,11 @@ def mongodb_copy( return outcome -def mongodb_relay_cdc(source_url, target_url, progress: bool = False): +def mongodb_relay_cdc( + source_url, + target_url, + transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None, +): """ Synopsis -------- @@ -191,22 +195,14 @@ def mongodb_relay_cdc(source_url, target_url, progress: bool = False): """ logger.info("Running MongoDB CDC relay") - # Decode database URL. - mongodb_address = DatabaseAddress.from_string(source_url) - mongodb_uri, mongodb_collection_address = mongodb_address.decode() - mongodb_database = mongodb_collection_address.schema - mongodb_collection = mongodb_collection_address.table - - cratedb_address = DatabaseAddress.from_string(target_url) - cratedb_uri, cratedb_table_address = cratedb_address.decode() + # Optionally configure transformations. + tm = TransformationManager.from_any(transformation) # Configure machinery. relay = MongoDBCDCRelayCrateDB( - mongodb_url=str(mongodb_uri), - mongodb_database=mongodb_database, - mongodb_collection=mongodb_collection, - cratedb_sqlalchemy_url=str(cratedb_uri), - cratedb_table=cratedb_table_address.fullname, + mongodb_url=source_url, + cratedb_url=target_url, + tm=tm, ) # Invoke machinery. diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py index 30c0c86..9b802c7 100644 --- a/cratedb_toolkit/io/mongodb/cdc.py +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -8,11 +8,16 @@ """ import logging +import typing as t -import pymongo import sqlalchemy as sa -from commons_codec.transform.mongodb import MongoDBCDCTranslator +from boltons.urlutils import URL +from commons_codec.transform.mongodb import MongoDBCDCTranslator, MongoDBCrateDBConverter +from zyp.model.collection import CollectionAddress +from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory +from cratedb_toolkit.io.mongodb.transform import TransformationManager +from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util import DatabaseAdapter logger = logging.getLogger(__name__) @@ -25,17 +30,47 @@ class MongoDBCDCRelayCrateDB: def __init__( self, - mongodb_url: str, - mongodb_database: str, - mongodb_collection: str, - cratedb_sqlalchemy_url: str, - cratedb_table: str, + mongodb_url: t.Union[str, URL], + cratedb_url: t.Union[str, URL], + tm: t.Union[TransformationManager, None], + on_error: t.Literal["ignore", "raise"] = "ignore", + debug: bool = True, ): - self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True) - self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url) - self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] - self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.cdc = MongoDBCDCTranslator(table_name=self.table_name) + self.mongodb_uri = URL(mongodb_url) + self.cratedb_uri = URL(cratedb_url) + + # Decode database URL: MongoDB. + self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) + + # Decode database URL: CrateDB. + self.cratedb_address = DatabaseAddress(self.cratedb_uri) + self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode() + cratedb_table = self.cratedb_table_address.fullname + + self.cratedb_adapter = DatabaseAdapter(str(self.cratedb_sqlalchemy_url), echo=False) + self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) + + # Transformation machinery. + transformation = None + if tm: + address = CollectionAddress( + container=self.mongodb_adapter.database_name, name=self.mongodb_adapter.collection_name + ) + try: + transformation = tm.project.get(address=address) + logger.info(f"Applying transformation to: {address}") + except KeyError: + logger.warning(f"No transformation found for: {address}") + self.converter = MongoDBCrateDBConverter( + timestamp_to_epoch=True, + timestamp_use_milliseconds=True, + transformation=transformation, + ) + + self.cdc = MongoDBCDCTranslator(table_name=self.cratedb_table, converter=self.converter) + + self.on_error = on_error + self.debug = debug def start(self): """ @@ -52,10 +87,10 @@ def cdc_to_sql(self): """ Subscribe to change stream events, and emit corresponding SQL statements. """ - # Note that `.watch()` will block until events are ready for consumption, so - # this is not a busy loop. + # Note that `.subscribe()` (calling `.watch()`) will block until events are ready + # for consumption, so this is not a busy loop. # FIXME: Note that the function does not perform any sensible error handling yet. while True: - with self.mongodb_collection.watch(full_document="updateLookup") as change_stream: + with self.mongodb_adapter.subscribe() as change_stream: for change in change_stream: yield self.cdc.to_sql(change) diff --git a/doc/io/mongodb/cdc.md b/doc/io/mongodb/cdc.md index 8010efb..803544e 100644 --- a/doc/io/mongodb/cdc.md +++ b/doc/io/mongodb/cdc.md @@ -111,6 +111,12 @@ crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-c ``` +## Transformations +You can use [Zyp Transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option. + + ## Appendix A few operations that are handy when exploring this exercise. @@ -154,3 +160,4 @@ mongosh "${MONGODB_URL}" --eval 'db.demo.drop()' [MongoDB Atlas]: https://www.mongodb.com/atlas [MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/ [SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81 +[Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/index.html diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index ad638bf..0c6cc3f 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -126,7 +126,7 @@ Use the HTTP URL query parameter `offset` on the source URL, like `&offset=42`, in order to start processing at this record from the beginning. -## Zyp Transformations +## Transformations You can use [Zyp Transformations] to change the shape of the data while being transferred. In order to add it to the pipeline, use the `--transformation` command line option.