Skip to content

Commit

Permalink
MongoDB: Unlock importing MongoDB Extended JSON using file+bson://...
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 10, 2024
1 parent 4297f5d commit cf8213d
Show file tree
Hide file tree
Showing 17 changed files with 526 additions and 103 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- MongoDB: Add `--treatment` option for applying special treatments to certain items
on real-world data
- MongoDB: Use pagination on source collection, for creating batches towards CrateDB
- MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...`

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
68 changes: 44 additions & 24 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import abstractmethod
from pathlib import Path

from yarl import URL
from boltons.urlutils import URL

from cratedb_toolkit.api.guide import GuidingTexts
from cratedb_toolkit.cluster.util import get_cluster_info
Expand Down Expand Up @@ -113,46 +113,66 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = resource.url
source_url_obj = URL(resource.url)
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("dynamodb"):

if source_url_obj.scheme.startswith("dynamodb"):
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy

if not dynamodb_copy(source_url, target_url, progress=True):
if not dynamodb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

elif source_url.startswith("influxdb"):
elif source_url_obj.scheme.startswith("file"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
mongodb_copy_generic(

Check warning on line 129 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L128-L129

Added lines #L128 - L129 were not covered by tests
str(source_url_obj),
target_url,
transformation=transformation,
progress=True,
)

elif source_url_obj.scheme.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
if asbool(source_url_obj.query.get("ssl")):
http_scheme = "https://"
source_url = source_url.replace("influxdb2://", http_scheme)
if not influxdb_copy(source_url, target_url, progress=True):
http_scheme = "http"
if asbool(source_url_obj.query_params.get("ssl")):
http_scheme = "https"

Check warning on line 141 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L141

Added line #L141 was not covered by tests
source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme)
if not influxdb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")

elif source_url_obj.scheme.startswith("mongodb"):
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")

Check warning on line 150 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L150

Added line #L150 was not covered by tests
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

mongodb_relay_cdc(source_url, target_url, progress=True)
mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)

Check warning on line 153 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L153

Added line #L153 was not covered by tests
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(
source_url,
mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
limit=int(source_url_obj.query.get("limit", 0)),
progress=True,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
)
else:
raise NotImplementedError("Importing resource not implemented yet")


def mongodb_copy_generic(
source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False
):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(
source_url,
target_url,
transformation=transformation,
progress=progress,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

Check warning on line 178 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L176-L178

Added lines #L176 - L178 were not covered by tests
134 changes: 134 additions & 0 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import itertools
import logging
import typing as t
from abc import abstractmethod
from functools import cached_property
from pathlib import Path

import boltons.urlutils
import polars as pl
import pymongo
import yarl
from attrs import define, field
from boltons.urlutils import URL
from bson.raw_bson import RawBSONDocument
from undatum.common.iterable import IterableData

from cratedb_toolkit.io.mongodb.util import batches
from cratedb_toolkit.model import DatabaseAddress

logger = logging.getLogger(__name__)


@define
class MongoDBAdapterBase:
address: DatabaseAddress
effective_url: URL
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "limit", "offset"]

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
if not isinstance(url, str):
url = str(url)
mongodb_address = DatabaseAddress.from_string(url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
logger.info(f"Collection address: {mongodb_collection_address}")
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table
for custom_query_parameter in cls._custom_query_parameters:
mongodb_uri.query_params.pop(custom_query_parameter, None)
return cls(
address=mongodb_address,
effective_url=mongodb_uri,
database_name=mongodb_database,
collection_name=mongodb_collection,
)

def __attrs_post_init__(self):
self.setup()

@cached_property
def batch_size(self) -> int:
return int(self.address.uri.query_params.get("batch-size", 500))

@cached_property
def limit(self) -> int:
return int(self.address.uri.query_params.get("limit", 0))

@cached_property
def offset(self) -> int:
return int(self.address.uri.query_params.get("offset", 0))

@abstractmethod
def setup(self):
raise NotImplementedError()

Check warning on line 67 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L67

Added line #L67 was not covered by tests

@abstractmethod
def record_count(self, filter_=None) -> int:
raise NotImplementedError()

Check warning on line 71 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L71

Added line #L71 was not covered by tests

@abstractmethod
def query(self):
raise NotImplementedError()

Check warning on line 75 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L75

Added line #L75 was not covered by tests


@define
class MongoDBFileAdapter(MongoDBAdapterBase):
_path: Path = field(init=False)

def setup(self):
self._path = Path(self.address.uri.path)

def record_count(self, filter_=None) -> int:
"""
https://stackoverflow.com/a/27517681
"""
f = open(self._path, "rb")
bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen if buf)

def query(self):
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")

Check warning on line 95 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L95

Added line #L95 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")

Check warning on line 97 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L97

Added line #L97 was not covered by tests
if self._path.suffix in [".ndjson", ".jsonl"]:
data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts()
elif ".bson" in str(self._path):
data = IterableData(str(self._path), options={"format_in": "bson"}).iter()
else:
raise ValueError(f"Unsupported file type: {self._path.suffix}")

Check warning on line 103 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L103

Added line #L103 was not covered by tests
return batches(data, self.batch_size)


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
_mongodb_client: pymongo.MongoClient = field(init=False)
_mongodb_collection: pymongo.collection.Collection = field(init=False)

def setup(self):
self._mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
str(self.effective_url),
document_class=RawBSONDocument,
datetime_conversion="DATETIME_AUTO",
)
self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name]

def record_count(self, filter_=None) -> int:
filter_ = filter_ or {}
return self._mongodb_collection.count_documents(filter=filter_)

def query(self):
data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit)
return batches(data, self.batch_size)


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
return MongoDBFileAdapter.from_url(mongodb_uri)
elif mongodb_uri.scheme.startswith("mongodb"):
return MongoDBServerAdapter.from_url(mongodb_uri)
raise ValueError("Unable to create MongoDB adapter")

Check warning on line 134 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L134

Added line #L134 was not covered by tests
21 changes: 6 additions & 15 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import logging
import typing as t
from pathlib import Path

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
Expand Down Expand Up @@ -41,12 +42,11 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi
# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri),
url=str(mongodb_uri) + f"&limit={limit}",
database=mongodb_database,
collection=mongodb_collection,
scan="partial",
transformation=transformation,
limit=limit,
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
Expand Down Expand Up @@ -75,19 +75,18 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(
url=str(mongodb_uri),
url=str(mongodb_uri) + f"&limit={limit}",
database=mongodb_database,
collection=mongodb_collection,
transformation=transformation,
limit=limit,
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True


def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False):
"""
Transfer MongoDB collection using translator component.
Expand All @@ -97,13 +96,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
ctk load table mongodb://localhost:27017/testdrive/demo
"""

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

logger.info(f"Invoking MongoDBFullLoad. mongodb_uri={mongodb_uri}")
logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}")

# Optionally configure transformations.
tm = None
Expand All @@ -112,9 +105,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int

# Invoke `full-load` procedure.
mdb_full = MongoDBFullLoad(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
progress=progress,
Expand Down
Loading

0 comments on commit cf8213d

Please sign in to comment.