From 9e5246486993ffc2537551dd5f0ecabc665835d7 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 00:08:00 +0200 Subject: [PATCH] MongoDB: Improve URL computation when transferring whole databases --- cratedb_toolkit/io/mongodb/api.py | 42 ++++++++++++++--------------- cratedb_toolkit/model.py | 44 +++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index b201731..83a9f30 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -11,7 +11,7 @@ from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad from cratedb_toolkit.io.mongodb.core import export, extract, translate from cratedb_toolkit.io.mongodb.transform import TransformationManager -from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.model import AddressPair, DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json from cratedb_toolkit.util.database import DatabaseAdapter @@ -115,8 +115,7 @@ def mongodb_copy( if transformation: tm = TransformationManager(path=transformation) - tasks = [] - + # Check if source address URL includes a table name or not. has_table = True if "*" in source_url.path: has_table = False @@ -125,7 +124,12 @@ def mongodb_copy( if mongodb_collection_address.table is None: has_table = False - # Invoke `full-load` procedure. + # Build list of tasks. Either a single one when transferring a single + # collection into a table, or multiple ones when transferring multiple + # collections. + tasks = [] + + # `full-load` procedure, single collection. if has_table: tasks.append( MongoDBFullLoad( @@ -135,32 +139,26 @@ def mongodb_copy( progress=progress, ) ) + + # `full-load` procedure, multiple collections. else: logger.info(f"Inquiring collections at {source_url}") - mongodb_uri = source_url - cratedb_uri = target_url - # What the hack? - if ( - mongodb_uri.scheme.startswith("mongodb") - and Path(mongodb_uri.path).is_absolute() - and mongodb_uri.path[-1] != "/" - ): - mongodb_uri.path += "/" - if cratedb_uri.path[-1] != "/": - cratedb_uri.path += "/" - mongodb_query_parameters = mongodb_uri.query_params - mongodb_adapter = mongodb_adapter_factory(mongodb_uri) + address_pair_root = AddressPair(source_url=source_url, target_url=target_url) + + mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url) collections = mongodb_adapter.get_collections() logger.info(f"Discovered collections: {len(collections)}") logger.debug(f"Processing collections: {collections}") + for collection_path in collections: - mongodb_uri_effective = mongodb_uri.navigate(Path(collection_path).name) - mongodb_uri_effective.query_params = mongodb_query_parameters - cratedb_uri_effective = cratedb_uri.navigate(Path(collection_path).stem) + address_pair = address_pair_root.navigate( + source_path=Path(collection_path).name, + target_path=Path(collection_path).stem, + ) tasks.append( MongoDBFullLoad( - mongodb_url=mongodb_uri_effective, - cratedb_url=cratedb_uri_effective, + mongodb_url=address_pair.source_url, + cratedb_url=address_pair.target_url, tm=tm, progress=progress, ) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 66f443d..2360ecd 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -1,7 +1,10 @@ import dataclasses import typing as t from copy import deepcopy +from pathlib import Path +from attr import Factory +from attrs import define from boltons.urlutils import URL @@ -120,3 +123,44 @@ class InputOutputResource: url: str format: t.Optional[str] = None # noqa: A003 compression: t.Optional[str] = None + + +@define +class AddressPair: + """ + Manage two URL instances, specifically a pair of source/target URLs, + where target is mostly a CrateDB Server, while source is any. + """ + + source_url: URL + target_url: URL + + _source_url_query_parameters: t.Dict[str, t.Any] = Factory(dict) + _target_url_query_parameters: t.Dict[str, t.Any] = Factory(dict) + + __SERVER_SCHEMES__ = ["http", "https", "mongodb", "mongodb+srv"] + + def navigate(self, source_path: str, target_path: str) -> "AddressPair": + source_url_query_parameters = self.source_url.query_params + target_url_query_parameters = self.target_url.query_params + + source_url = URL(str(self.source_url)) + target_url = URL(str(self.target_url)) + + # Q: What the hack? + # A: It makes subsequent `.navigate()` operations work. + if ( + source_url.scheme in self.__SERVER_SCHEMES__ + and Path(source_url.path).is_absolute() + and source_url.path[-1] != "/" + ): + source_url.path += "/" + if target_url.path[-1] != "/": + target_url.path += "/" + + source_url = source_url.navigate(f"./{source_path}") + source_url.query_params = source_url_query_parameters + target_url = target_url.navigate(f"./{target_path}") + target_url.query_params = target_url_query_parameters + + return AddressPair(source_url, target_url)