Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Add Zyp transformations to CDC subsystem #288

Merged
merged 2 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- MongoDB: Added Zyp transformations to the CDC subsystem,
making it more symmetric to the full-load procedure.

## 2024/10/09 v0.0.28
- IO: Improved `BulkProcessor` when running per-record operations by
Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@

from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
return mongodb_relay_cdc(

Check warning on line 159 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L159

Added line #L159 was not covered by tests
source_url_obj,
target_url,
transformation=transformation,
)
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

Expand Down
54 changes: 48 additions & 6 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import bson
import pymongo
import pymongo.collection
import pymongo.database
import yarl
from attrs import define, field
from boltons.urlutils import URL
from bson.raw_bson import RawBSONDocument
from undatum.common.iterable import IterableData

from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.util import batches
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.data import asbool
from cratedb_toolkit.util.io import read_json

logger = logging.getLogger(__name__)
Expand All @@ -32,7 +35,11 @@
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "filter", "limit", "offset"]
_custom_query_parameters = ["batch-size", "direct", "filter", "limit", "offset", "timeout"]
_default_timeout = 5000

direct: bool = False
timeout: int = _default_timeout

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
Expand All @@ -42,13 +49,17 @@
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table
direct = asbool(mongodb_uri.query_params.pop("direct", False))
timeout = mongodb_uri.query_params.pop("timeout", cls._default_timeout)
for custom_query_parameter in cls._custom_query_parameters:
mongodb_uri.query_params.pop(custom_query_parameter, None)
return cls(
address=mongodb_address,
effective_url=mongodb_uri,
database_name=mongodb_database,
collection_name=mongodb_collection,
direct=direct,
timeout=timeout,
)

def __attrs_post_init__(self):
Expand All @@ -75,7 +86,7 @@
raise NotImplementedError()

@abstractmethod
def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
raise NotImplementedError()

@abstractmethod
Expand All @@ -86,6 +97,10 @@
def query(self):
raise NotImplementedError()

@abstractmethod
def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
raise NotImplementedError()

Check warning on line 102 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L102

Added line #L102 was not covered by tests


@define
class MongoDBFilesystemAdapter(MongoDBAdapterBase):
Expand All @@ -94,7 +109,7 @@
def setup(self):
self._path = Path(self.address.uri.path)

def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
return sorted(glob.glob(str(self._path)))

def record_count(self, filter_=None) -> int:
Expand Down Expand Up @@ -122,6 +137,9 @@
raise ValueError(f"Unsupported file type: {self._path.suffix}")
return batches(data, self.batch_size)

def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
raise NotImplementedError("Subscribing to a change stream is not supported by filesystem adapter")

Check warning on line 141 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L141

Added line #L141 was not covered by tests


@define
class MongoDBResourceAdapter(MongoDBAdapterBase):
Expand All @@ -132,7 +150,7 @@
if "+bson" in self._url.scheme:
self._url.scheme = self._url.scheme.replace("+bson", "")

def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet")

def record_count(self, filter_=None) -> int:
Expand All @@ -153,22 +171,36 @@
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)

def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
raise NotImplementedError("HTTP+BSON loader does not support subscribing to a change stream")

Check warning on line 175 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L175

Added line #L175 was not covered by tests


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
_mongodb_client: pymongo.MongoClient = field(init=False)
_mongodb_database: pymongo.database.Database = field(init=False)
_mongodb_collection: pymongo.collection.Collection = field(init=False)

def setup(self):
self._mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
str(self.effective_url),
document_class=RawBSONDocument,
datetime_conversion="DATETIME_AUTO",
directConnection=self.direct,
socketTimeoutMS=self.timeout,
connectTimeoutMS=self.timeout,
serverSelectionTimeoutMS=self.timeout,
)
if self.database_name:
self._mongodb_database = self._mongodb_client.get_database(self.database_name)
if self.collection_name:
self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name]
self._mongodb_collection = self._mongodb_database.get_collection(self.collection_name)

@property
def collection(self):
return self._mongodb_collection

Check warning on line 201 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L201

Added line #L201 was not covered by tests

def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
database = self._mongodb_client.get_database(self.database_name)
return sorted(database.list_collection_names())

Expand All @@ -193,6 +225,16 @@
)
return batches(data, self.batch_size)

def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
return self._mongodb_collection.watch(

Check warning on line 229 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L229

Added line #L229 was not covered by tests
full_document="updateLookup", batch_size=self.batch_size, resume_after=resume_after
)

def create_collection(self):
self._mongodb_database.create_collection(self.collection_name)
self._mongodb_collection = self._mongodb_database.get_collection(self.collection_name)
return self._mongodb_collection

Check warning on line 236 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L234-L236

Added lines #L234 - L236 were not covered by tests


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
Expand Down
26 changes: 11 additions & 15 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
address_pair_root = AddressPair(source_url=source_url, target_url=target_url)

mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url)
collections = mongodb_adapter.get_collections()
collections = mongodb_adapter.get_collection_names()
logger.info(f"Discovered collections: {len(collections)}")
logger.debug(f"Processing collections: {collections}")

Expand Down Expand Up @@ -175,7 +175,11 @@
return outcome


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
def mongodb_relay_cdc(
source_url,
target_url,
transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None,
):
"""
Synopsis
--------
Expand All @@ -191,22 +195,14 @@
"""
logger.info("Running MongoDB CDC relay")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()
# Optionally configure transformations.
tm = TransformationManager.from_any(transformation)

Check warning on line 199 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L199

Added line #L199 was not covered by tests

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
)

# Invoke machinery.
Expand Down
115 changes: 92 additions & 23 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
"""
Basic relaying of a MongoDB Change Stream into CrateDB table.
Relay a MongoDB Change Stream into a CrateDB table.

Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
"""

import logging
import typing as t

import pymongo
import pymongo.errors
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslator
from boltons.urlutils import URL
from commons_codec.transform.mongodb import MongoDBCDCTranslator, MongoDBCrateDBConverter
from pymongo.change_stream import CollectionChangeStream
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.process import FixedBackoff

logger = logging.getLogger(__name__)

Expand All @@ -25,17 +35,51 @@

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "ignore",
debug: bool = True,
):
self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslator(table_name=self.table_name)
self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Check warning on line 45 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L44-L45

Added lines #L44 - L45 were not covered by tests

logger.info(f"Initializing MongoDB CDC Relay. mongodb={mongodb_url}, cratedb={cratedb_url}")

Check warning on line 47 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L47

Added line #L47 was not covered by tests

# Decode database URL: MongoDB.
self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri)

Check warning on line 50 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L50

Added line #L50 was not covered by tests

# Decode database URL: CrateDB.
self.cratedb_address = DatabaseAddress(self.cratedb_uri)
self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode()
cratedb_table = self.cratedb_table_address.fullname

Check warning on line 55 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L53-L55

Added lines #L53 - L55 were not covered by tests

self.cratedb_adapter = DatabaseAdapter(str(self.cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)

Check warning on line 58 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L57-L58

Added lines #L57 - L58 were not covered by tests

# Transformation machinery.
transformation = None
if tm:
address = CollectionAddress(

Check warning on line 63 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L61-L63

Added lines #L61 - L63 were not covered by tests
container=self.mongodb_adapter.database_name, name=self.mongodb_adapter.collection_name
)
try:
transformation = tm.project.get(address=address)
logger.info(f"Applying transformation to: {address}")
except KeyError:
logger.warning(f"No transformation found for: {address}")
self.converter = MongoDBCrateDBConverter(

Check warning on line 71 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L66-L71

Added lines #L66 - L71 were not covered by tests
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)

self.cdc = MongoDBCDCTranslator(table_name=self.cratedb_table, converter=self.converter)
self.ccs: CollectionChangeStream

Check warning on line 78 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L77-L78

Added lines #L77 - L78 were not covered by tests

self.on_error = on_error
self.debug = debug
self.stopping: bool = False

Check warning on line 82 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L80-L82

Added lines #L80 - L82 were not covered by tests

def start(self):
"""
Expand All @@ -44,18 +88,43 @@
# FIXME: Note that the function does not perform any sensible error handling yet.
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for operation in self.cdc_to_sql():
for event in self.consume():
operation = self.cdc.to_sql(event)

Check warning on line 92 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L91-L92

Added lines #L91 - L92 were not covered by tests
if operation:
connection.execute(sa.text(operation.statement), operation.parameters)

def cdc_to_sql(self):
def stop(self):
self.stopping = True
self.ccs._closed = True

Check warning on line 98 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L97-L98

Added lines #L97 - L98 were not covered by tests

def consume(self, resume_after: t.Optional[DocumentDict] = None):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
Subscribe to change stream events, and emit change events.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop.
# FIXME: Note that the function does not perform any sensible error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
for change in change_stream:
yield self.cdc.to_sql(change)
self.ccs = self.mongodb_adapter.subscribe_cdc(resume_after=resume_after)

Check warning on line 104 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L104

Added line #L104 was not covered by tests

if self.stopping:
return

Check warning on line 107 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L106-L107

Added lines #L106 - L107 were not covered by tests

backoff = FixedBackoff(sequence=[2, 4, 6, 8, 10])
resume_token = None
try:
with self.ccs as stream:
for event in stream:
yield event
resume_token = stream.resume_token
backoff.reset()
except pymongo.errors.PyMongoError:

Check warning on line 117 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L109-L117

Added lines #L109 - L117 were not covered by tests
# The ChangeStream encountered an unrecoverable error or the
# resume attempt failed to recreate the cursor.
if resume_token is None:

Check warning on line 120 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L120

Added line #L120 was not covered by tests
# There is no usable resume token because there was a
# failure during ChangeStream initialization.
logger.exception("Initializing change stream failed")

Check warning on line 123 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L123

Added line #L123 was not covered by tests
else:
# Use the interrupted ChangeStream's resume token to create
# a new ChangeStream. The new stream will continue from the
# last seen insert change without missing any events.
backoff.next()
logger.info("Resuming change stream")
self.consume(resume_after=resume_after)

Check warning on line 130 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L128-L130

Added lines #L128 - L130 were not covered by tests
Loading