Skip to content

Commit

Permalink
MongoDB: Add Zyp transformations to the CDC subsystem
Browse files Browse the repository at this point in the history
... making it more symmetric to the full-load procedure.
  • Loading branch information
amotl committed Oct 9, 2024
1 parent 88be2d2 commit 4b3f5e0
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- MongoDB: Added Zyp transformations to the CDC subsystem,
making it more symmetric to the full-load procedure.

## 2024/10/09 v0.0.28
- IO: Improved `BulkProcessor` when running per-record operations by
Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ def load_table(

from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
return mongodb_relay_cdc(

Check warning on line 159 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L159

Added line #L159 was not covered by tests
source_url_obj,
target_url,
transformation=transformation,
)
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

Expand Down
13 changes: 13 additions & 0 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def record_count(self, filter_=None) -> int:
def query(self):
raise NotImplementedError()

@abstractmethod
def subscribe(self):
raise NotImplementedError()

Check warning on line 91 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L91

Added line #L91 was not covered by tests


@define
class MongoDBFilesystemAdapter(MongoDBAdapterBase):
Expand Down Expand Up @@ -122,6 +126,9 @@ def query(self):
raise ValueError(f"Unsupported file type: {self._path.suffix}")
return batches(data, self.batch_size)

def subscribe(self):
raise NotImplementedError("Subscribing to a change stream is not supported by filesystem adapter")

Check warning on line 130 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L130

Added line #L130 was not covered by tests


@define
class MongoDBResourceAdapter(MongoDBAdapterBase):
Expand Down Expand Up @@ -153,6 +160,9 @@ def query(self):
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)

def subscribe(self):
raise NotImplementedError("HTTP+BSON loader does not support subscribing to a change stream")

Check warning on line 164 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L164

Added line #L164 was not covered by tests


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
Expand Down Expand Up @@ -193,6 +203,9 @@ def query(self):
)
return batches(data, self.batch_size)

def subscribe(self):
return self._mongodb_collection.watch(full_document="updateLookup")

Check warning on line 207 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L207

Added line #L207 was not covered by tests


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
Expand Down
24 changes: 10 additions & 14 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ def mongodb_copy(
return outcome


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
def mongodb_relay_cdc(
source_url,
target_url,
transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None,
):
"""
Synopsis
--------
Expand All @@ -191,22 +195,14 @@ def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
logger.info("Running MongoDB CDC relay")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()
# Optionally configure transformations.
tm = TransformationManager.from_any(transformation)

Check warning on line 199 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L199

Added line #L199 was not covered by tests

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
)

# Invoke machinery.
Expand Down
65 changes: 50 additions & 15 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
"""

import logging
import typing as t

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslator
from boltons.urlutils import URL
from commons_codec.transform.mongodb import MongoDBCDCTranslator, MongoDBCrateDBConverter
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand All @@ -25,17 +30,47 @@ class MongoDBCDCRelayCrateDB:

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "ignore",
debug: bool = True,
):
self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslator(table_name=self.table_name)
self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Check warning on line 40 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L39-L40

Added lines #L39 - L40 were not covered by tests

# Decode database URL: MongoDB.
self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri)

Check warning on line 43 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L43

Added line #L43 was not covered by tests

# Decode database URL: CrateDB.
self.cratedb_address = DatabaseAddress(self.cratedb_uri)
self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode()
cratedb_table = self.cratedb_table_address.fullname

Check warning on line 48 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L46-L48

Added lines #L46 - L48 were not covered by tests

self.cratedb_adapter = DatabaseAdapter(str(self.cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)

Check warning on line 51 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L50-L51

Added lines #L50 - L51 were not covered by tests

# Transformation machinery.
transformation = None
if tm:
address = CollectionAddress(

Check warning on line 56 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L54-L56

Added lines #L54 - L56 were not covered by tests
container=self.mongodb_adapter.database_name, name=self.mongodb_adapter.collection_name
)
try:
transformation = tm.project.get(address=address)
logger.info(f"Applying transformation to: {address}")
except KeyError:
logger.warning(f"No transformation found for: {address}")
self.converter = MongoDBCrateDBConverter(

Check warning on line 64 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L59-L64

Added lines #L59 - L64 were not covered by tests
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)

self.cdc = MongoDBCDCTranslator(table_name=self.cratedb_table, converter=self.converter)

Check warning on line 70 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L70

Added line #L70 was not covered by tests

self.on_error = on_error
self.debug = debug

Check warning on line 73 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L72-L73

Added lines #L72 - L73 were not covered by tests

def start(self):
"""
Expand All @@ -52,10 +87,10 @@ def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop.
# Note that `.subscribe()` (calling `.watch()`) will block until events are ready
# for consumption, so this is not a busy loop.
# FIXME: Note that the function does not perform any sensible error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
with self.mongodb_adapter.subscribe() as change_stream:

Check warning on line 94 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L94

Added line #L94 was not covered by tests
for change in change_stream:
yield self.cdc.to_sql(change)
7 changes: 7 additions & 0 deletions doc/io/mongodb/cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-c
```


## Transformations
You can use [Zyp Transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
command line option.


## Appendix
A few operations that are handy when exploring this exercise.

Expand Down Expand Up @@ -154,3 +160,4 @@ mongosh "${MONGODB_URL}" --eval 'db.demo.drop()'
[MongoDB Atlas]: https://www.mongodb.com/atlas
[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/
[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81
[Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/index.html
2 changes: 1 addition & 1 deletion doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Use the HTTP URL query parameter `offset` on the source URL, like
`&offset=42`, in order to start processing at this record from the
beginning.

## Zyp Transformations
## Transformations
You can use [Zyp Transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
command line option.
Expand Down

0 comments on commit 4b3f5e0

Please sign in to comment.