From a9004030c57cfa580bdce2f0a2b8a691cfad7f88 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 15 Nov 2023 08:35:39 +0100 Subject: [PATCH] Cloud API: Naming things, improved UX, more OO - Naming things: Use more appropriate names for variables. - OO: Add `EnvironmentConfiguration`, to manage information about the toolkit environment. - OO: Add `CloudJob` entity, to manage information about a cloud job, and to streamline passing of information. - OO: Refactor low-level methods to `CloudCluster`. - Capability to configure error handling based on the environment. - Capability to conveniently acquire configuration settings from the environment, for obtaining the CrateDB Cluster identifier or name. - Tests: Add `DockerSkippingContainer` and `PytestTestcontainerAdapter`. Both skip test execution when the Docker daemon is not running, or not available to the environment. - Examples: Improve UX of `cloud_*.py` example programs. --- cratedb_toolkit/__init__.py | 4 + cratedb_toolkit/api/main.py | 181 +++++++++++---- cratedb_toolkit/cluster/croud.py | 93 +++++++- cratedb_toolkit/config.py | 39 ++++ cratedb_toolkit/io/cli.py | 6 +- cratedb_toolkit/io/croud.py | 214 +++++++++--------- cratedb_toolkit/job/croud.py | 4 + .../testing/testcontainers/cratedb.py | 8 +- .../testing/testcontainers/influxdb2.py | 5 +- .../testing/testcontainers/minio.py | 6 +- .../testing/testcontainers/mongodb.py | 9 +- .../testing/testcontainers/util.py | 73 ++++++ cratedb_toolkit/util/__init__.py | 1 + cratedb_toolkit/util/basic.py | 49 ---- cratedb_toolkit/util/common.py | 8 + cratedb_toolkit/util/croud.py | 17 +- cratedb_toolkit/util/runtime.py | 70 ++++++ cratedb_toolkit/util/setting.py | 154 +++++++++++++ examples/cloud_cluster.py | 98 ++++---- examples/cloud_import.py | 109 +++++---- pyproject.toml | 7 +- tests/conftest.py | 29 +-- tests/io/influxdb/conftest.py | 11 +- tests/io/mongodb/conftest.py | 11 +- tests/io/test_import.py | 2 +- tests/retention/test_cli.py | 7 +- tests/retention/test_examples.py | 17 +- 27 files changed, 881 insertions(+), 351 deletions(-) create mode 100644 cratedb_toolkit/config.py delete mode 100644 cratedb_toolkit/util/basic.py create mode 100644 cratedb_toolkit/util/runtime.py create mode 100644 cratedb_toolkit/util/setting.py diff --git a/cratedb_toolkit/__init__.py b/cratedb_toolkit/__init__.py index c37fc8d9..c5578d73 100644 --- a/cratedb_toolkit/__init__.py +++ b/cratedb_toolkit/__init__.py @@ -9,3 +9,7 @@ __version__ = version(__appname__) except PackageNotFoundError: # pragma: no cover __version__ = "unknown" + +from .api import ManagedCluster # noqa: F401 +from .config import configure # noqa: F401 +from .model import InputOutputResource, TableAddress # noqa: F401 diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index fabceb3f..761ace6d 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -2,86 +2,166 @@ import dataclasses import json import logging +import sys import typing as t from abc import abstractmethod from cratedb_toolkit.api.guide import GuidingTexts from cratedb_toolkit.cluster.util import deploy_cluster, get_cluster_by_name, get_cluster_info +from cratedb_toolkit.config import CONFIG from cratedb_toolkit.exception import CroudException, OperationFailed -from cratedb_toolkit.io.croud import CloudIo +from cratedb_toolkit.io.croud import CloudIo, CloudJob from cratedb_toolkit.model import ClusterInformation, DatabaseAddress, InputOutputResource, TableAddress +from cratedb_toolkit.util.runtime import flexfun +from cratedb_toolkit.util.setting import RequiredMutuallyExclusiveSettingsGroup, Setting logger = logging.getLogger(__name__) class ClusterBase(abc.ABC): @abstractmethod - def load_table(self, resource: InputOutputResource, target: TableAddress): + def load_table(self, source: InputOutputResource, target: TableAddress): raise NotImplementedError("Child class needs to implement this method") -@dataclasses.dataclass class ManagedCluster(ClusterBase): """ Wrap a managed CrateDB database cluster on CrateDB Cloud. """ - cloud_id: t.Optional[str] = None - name: t.Optional[str] = None - address: t.Optional[DatabaseAddress] = None - info: t.Optional[ClusterInformation] = None - exists: bool = False + settings_spec = RequiredMutuallyExclusiveSettingsGroup( + Setting( + name="--cluster-id", + envvar="CRATEDB_CLOUD_CLUSTER_ID", + help="The CrateDB Cloud cluster identifier, an UUID", + ), + Setting( + name="--cluster-name", + envvar="CRATEDB_CLOUD_CLUSTER_NAME", + help="The CrateDB Cloud cluster name", + ), + ) + + def __init__( + self, + id: str = None, # noqa: A002 + name: str = None, + address: DatabaseAddress = None, + info: ClusterInformation = None, + ): + self.id = id + self.name = name + self.address = address + self.info: ClusterInformation = info or ClusterInformation() + self.exists: bool = False + if self.id is None and self.name is None: + raise ValueError("Failed to address cluster: Either cluster identifier or name needs to be specified") + + @classmethod + @flexfun(domain="settings") + def from_env(cls) -> "ManagedCluster": + """ + Obtain CrateDB Cloud cluster identifier or name from user environment. + The settings are mutually exclusive. - def __post_init__(self): - logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id} ({self.name})") - self.probe() + When the toolkit environment is configured with `settings_accept_cli`, + the settings can be specified that way: + + --cluster-id=e1e38d92-a650-48f1-8a70-8133f2d5c400 + --cluster-name=Hotzenplotz + + When the toolkit environment is configured with `settings_accept_env`, + the settings can be specified that way: + + export CRATEDB_CLOUD_CLUSTER_ID=e1e38d92-a650-48f1-8a70-8133f2d5c400 + export CRATEDB_CLOUD_CLUSTER_NAME=Hotzenplotz + """ + if not CONFIG.settings_accept_cli or not CONFIG.settings_accept_env: + raise ValueError( + "Unable to obtain cluster identifier or name without accepting settings from user environment" + ) + try: + cluster_id, cluster_name = cls.settings_spec.obtain_settings() + return cls(id=cluster_id, name=cluster_name) + + # TODO: With `flexfun`, can this section be improved? + except ValueError as ex: + logger.error(f"Failed to address cluster: {ex}") + if CONFIG.settings_errors == "exit": + sys.exit(1) + else: + raise + + def stop(self) -> "ManagedCluster": + logger.warning("Stopping cluster not implemented yet") + return self + + def delete(self) -> "ManagedCluster": + return self - def probe(self): + def probe(self) -> "ManagedCluster": """ Probe a CrateDB Cloud cluster, API-wise. """ - if not self.cloud_id and not self.name: - self.exists = False - raise ValueError("Either cluster identifier or name needs to be specified") try: - if self.cloud_id: - self.info = get_cluster_info(cluster_id=self.cloud_id) + if self.id: + self.info = get_cluster_info(cluster_id=self.id) self.name = self.info.cloud["name"] - else: + elif self.name: self.info = get_cluster_by_name(self.name) - self.cloud_id = self.info.cloud["id"] + self.id = self.info.cloud["id"] + else: + self.exists = False + raise ValueError("Failed to address cluster: Either cluster identifier or name needs to be specified") except CroudException as ex: self.exists = False if "Cluster not found" not in str(ex): raise - if self.info: + if self.info.cloud: self.exists = True logger.info(f"Cluster information: name={self.info.cloud.get('name')}, url={self.info.cloud.get('url')}") + return self - def acquire(self): + @flexfun(domain="runtime") + def start(self) -> "ManagedCluster": + """ + Start a database cluster. + When cluster does not exist, acquire/deploy it. + """ + logger.info(f"Deploying/starting/resuming CrateDB Cloud Cluster: id={self.id}, name={self.name}") + self.acquire() + return self + + def acquire(self) -> "ManagedCluster": """ Acquire a database cluster. - When cluster does not exist, deploy it. + This means going through the steps of deploy and/or start, as applicable. + + - When cluster does not exist, create/deploy it. + - When a cluster exists, but is stopped/hibernated, start/resume it. """ + self.probe() if not self.exists: - logger.info(f"Cluster does not exist: {self.name}") - logger.info(f"Deploying cluster: {self.name}") - if self.deploy(): - self.probe() - else: + logger.info(f"Cluster does not exist, deploying it: id={self.id}, name={self.name}") + self.deploy() + self.probe() + if not self.exists: + # TODO: Is it possible to gather and propagate more information why the deployment failed? raise CroudException(f"Deployment of cluster failed: {self.name}") + return self - def deploy(self): + def deploy(self) -> "ManagedCluster": """ Run the cluster deployment procedure. """ - try: - deploy_cluster(self.name) - except CroudException: - return False - return True - - def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None): + # FIXME: Accept id or name. + if self.name is None: + raise ValueError("Need cluster name to deploy") + deploy_cluster(self.name) + return self + + @flexfun(domain="runtime") + def load_table(self, source: InputOutputResource, target: t.Optional[TableAddress] = None) -> CloudJob: """ Load data into a database table on CrateDB Cloud. @@ -93,33 +173,36 @@ def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddr https://console.cratedb.cloud """ + self.probe() target = target or TableAddress() + # FIXME: Accept id or name. + if self.id is None: + raise ValueError("Need cluster identifier to load table") + try: - cio = CloudIo(cluster_id=self.cloud_id) + cio = CloudIo(cluster_id=self.id) except CroudException as ex: - msg = f"Connecting to cluster resource failed: {self.cloud_id}. Reason: {ex}" - if "Resource not found" in str(ex): - logger.error(msg) - return None, False + msg = f"Connecting to cluster resource failed: {self.id}. Reason: {ex}" logger.exception(msg) raise OperationFailed(msg) from ex try: - job_info, success = cio.load_resource(resource=resource, target=target) - logger.info("Job information:\n%s", json.dumps(job_info, indent=2)) + cloud_job = cio.load_resource(resource=source, target=target) + logger.info("Job information:\n%s", json.dumps(cloud_job.info, indent=2)) # TODO: Explicitly report about `failed_records`, etc. texts = GuidingTexts( admin_url=self.info.cloud["url"], - table_name=job_info["destination"]["table"], + table_name=cloud_job.info["destination"]["table"], ) - if success: + if cloud_job.success: logger.info("Data loading was successful: %s", texts.success()) - return job_info, success + return cloud_job else: # TODO: Add "reason" to exception message. - logger.error(f"Data loading failed: {texts.error()}") - raise OperationFailed("Data loading failed") + message = f"Data loading failed: {cloud_job.message}" + logger.error(f"{message}{texts.error()}") + raise OperationFailed(message) # When exiting so, it is expected that error logging has taken place appropriately. except CroudException as ex: @@ -137,7 +220,7 @@ class StandaloneCluster(ClusterBase): address: DatabaseAddress info: t.Optional[ClusterInformation] = None - def load_table(self, resource: InputOutputResource, target: TableAddress): + def load_table(self, source: InputOutputResource, target: TableAddress): """ Load data into a database table on a standalone CrateDB Server. @@ -148,7 +231,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): ctk load table influxdb2://example:token@localhost:8086/testdrive/demo ctk load table mongodb://localhost:27017/testdrive/demo """ - source_url = resource.url + source_url = source.url target_url = self.address.dburi if source_url.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy diff --git a/cratedb_toolkit/cluster/croud.py b/cratedb_toolkit/cluster/croud.py index 21b8f8ec..6fc81881 100644 --- a/cratedb_toolkit/cluster/croud.py +++ b/cratedb_toolkit/cluster/croud.py @@ -1,11 +1,14 @@ import json +import typing as t +from pathlib import Path +from cratedb_toolkit.model import InputOutputResource, TableAddress from cratedb_toolkit.util.croud import CroudCall, CroudWrapper class CloudManager: """ - A wrapper around the CrateDB Cloud cluster API through the `croud` package. + A wrapper around the CrateDB Cloud API through the `croud` package, providing common methods. """ def get_cluster_list(self): @@ -71,8 +74,12 @@ def deploy_cluster(self, name: str, project_id: str): croud clusters get e1e38d92-a650-48f1-8a70-8133f2d5c400 --format=json """ # noqa: E501 + # TODO: Use specific subscription, or, if only one exists, use it. + # Alternatively, acquire value from user environment. # TODO: `--product-name=crfree` is not always the right choice. ;] + # TODO: Auto-generate cluster name when not given. # TODO: How to select CrateDB nightly, like `--version=nightly`? + # TODO: Let the user provide the credentials. # TODO: Add more parameters, like `--org-id`, `--channel`, `--unit`, and more. # TODO: What about `--sudo`? from croud.__main__ import command_tree @@ -110,7 +117,7 @@ def deploy_cluster(self, name: str, project_id: str): class CloudCluster: """ - A wrapper around the CrateDB Cloud cluster API through the `croud` package. + A wrapper around the CrateDB Cloud API through the `croud` package, providing methods specific to a cluster. """ def __init__(self, cluster_id: str): @@ -142,3 +149,85 @@ def get_info(self): wr = CroudWrapper(call=call) return wr.invoke() + + def list_jobs(self): + from croud.clusters.commands import import_jobs_list + from croud.parser import Argument + + call = CroudCall( + fun=import_jobs_list, + specs=[Argument("--cluster-id", type=str, required=True, help="The cluster the import jobs belong to.")], + arguments=[ + f"--cluster-id={self.cluster_id}", + ], + ) + + wr = CroudWrapper(call=call) + return wr.invoke() + + def create_import_job(self, resource: InputOutputResource, target: TableAddress) -> t.Dict[str, t.Any]: + from croud.__main__ import import_job_create_common_args + from croud.clusters.commands import import_jobs_create_from_file, import_jobs_create_from_url + from croud.parser import Argument + + specs: t.List[Argument] = import_job_create_common_args + + url_argument = Argument("--url", type=str, required=True, help="The URL the import file will be read from.") + + file_id_argument = Argument( + "--file-id", + type=str, + required=False, + help="The file ID that will be used for the " + "import. If not specified then --file-path" + " must be specified. " + "Please refer to `croud organizations " + "files` for more info.", + ) + file_path_argument = Argument( + "--file-path", + type=str, + required=False, + help="The file in your local filesystem that " + "will be used. If not specified then " + "--file-id must be specified. " + "Please note the file will become visible " + "under `croud organizations files list`.", + ) + + # Compute command-line arguments for invoking `croud`. + # FIXME: This call is redundant. + path = Path(resource.url) + + # TODO: Sanitize table name. Which characters are allowed? + if path.exists(): + specs.append(file_path_argument) + specs.append(file_id_argument) + arguments = [ + f"--cluster-id={self.cluster_id}", + f"--file-path={resource.url}", + f"--table={target.table}", + f"--file-format={resource.format}", + ] + fun = import_jobs_create_from_file + else: + specs.append(url_argument) + arguments = [ + f"--cluster-id={self.cluster_id}", + f"--url={resource.url}", + f"--table={target.table}", + f"--file-format={resource.format}", + ] + fun = import_jobs_create_from_url + + if resource.compression is not None: + arguments += [f"--compression={resource.compression}"] + + call = CroudCall( + fun=fun, + specs=specs, + arguments=arguments, + ) + + wr = CroudWrapper(call=call) + return wr.invoke() diff --git a/cratedb_toolkit/config.py b/cratedb_toolkit/config.py new file mode 100644 index 00000000..e417b7f4 --- /dev/null +++ b/cratedb_toolkit/config.py @@ -0,0 +1,39 @@ +import dataclasses + + +@dataclasses.dataclass +class EnvironmentConfiguration: + """ + Manage information about the toolkit environment. + """ + + frontend_interactive: bool = True + frontend_guidance: bool = True + runtime_errors: str = "raise" + settings_accept_cli: bool = False + settings_accept_env: bool = False + settings_errors: str = "raise" + + +# The global toolkit environment. +# TODO: Provide as context manager. +CONFIG = EnvironmentConfiguration() + + +def configure( + frontend_interactive: bool = True, + frontend_guidance: bool = True, + runtime_errors: str = "raise", + settings_accept_cli: bool = False, + settings_accept_env: bool = False, + settings_errors: str = "raise", +): + """ + Configure the toolkit environment. + """ + CONFIG.frontend_interactive = frontend_interactive + CONFIG.frontend_guidance = frontend_guidance + CONFIG.runtime_errors = runtime_errors + CONFIG.settings_accept_cli = settings_accept_cli + CONFIG.settings_accept_env = settings_accept_env + CONFIG.settings_errors = settings_errors diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6c866e58..a226c1e4 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -71,15 +71,15 @@ def load_table( raise KeyError(error_message) # Encapsulate source and target parameters. - resource = InputOutputResource(url=url, format=format_, compression=compression) + source = InputOutputResource(url=url, format=format_, compression=compression) target = TableAddress(schema=schema, table=table) # Dispatch "load table" operation. cluster: ClusterBase if cluster_id: - cluster = ManagedCluster(cloud_id=cluster_id) + cluster = ManagedCluster(id=cluster_id) elif address: cluster = StandaloneCluster(address=address) else: raise NotImplementedError("Unable to select backend") - return cluster.load_table(resource=resource, target=target) + return cluster.load_table(source=source, target=target) diff --git a/cratedb_toolkit/io/croud.py b/cratedb_toolkit/io/croud.py index 1bddbfb3..5396fbe0 100644 --- a/cratedb_toolkit/io/croud.py +++ b/cratedb_toolkit/io/croud.py @@ -1,11 +1,11 @@ +import dataclasses import logging import time import typing as t from pathlib import Path -from cratedb_toolkit.job.croud import jobs_list +from cratedb_toolkit.cluster.croud import CloudCluster from cratedb_toolkit.model import InputOutputResource, TableAddress -from cratedb_toolkit.util.croud import CroudCall, CroudWrapper logger = logging.getLogger(__name__) @@ -19,6 +19,73 @@ class CloudIoSpecs: allowed_formats = ["csv", "json", "parquet"] +@dataclasses.dataclass +class CloudJob: + """ + Manage information about a cloud job. + + It gives quick access to important attributes, without needing to decode + nested dictionaries. + """ + + info: t.Dict = dataclasses.field(default_factory=dict) + found: bool = False + _custom_status: t.Optional[str] = None + _custom_message: t.Optional[str] = None + + def __post_init__(self): + self.fix_job_info_table_name() + + @classmethod + def unknown(cls, message: str): + cj = cls() + cj._custom_message = message + cj._custom_status = "UNKNOWN" + return cj + + @property + def id(self): # noqa: A003 + return self.info["id"] + + @property + def status(self): + if self._custom_status: + return self._custom_status + return self.info["status"] + + @property + def success(self): + return self.status == "SUCCEEDED" + + @property + def message(self): + if self._custom_message: + return self._custom_message + return self.info["progress"]["message"] + + def fix_job_info_table_name(self): + """ + Adjust full-qualified table name by adding appropriate quotes. + Fixes a minor flaw on the upstream API. + + Currently, the API returns `testdrive.pems-1`, but that can not be used at + all, because it is not properly quoted. It also can not be used 1:1, because + it is not properly quoted. + + So, converge the table name into `"testdrive"."pems-1"` manually, for a + full-qualified representation. + + FIXME: Remove after upstream has fixed the flaw. + """ + job_info = self.info + if "destination" in job_info and "table" in job_info["destination"]: + table = job_info["destination"]["table"] + if '"' not in table and "." in table: + schema, table = table.split(".") + table = f'"{schema}"."{table}"' + job_info["destination"]["table"] = table + + class CloudIo: """ Wrap access to CrateDB Cloud Import API. @@ -26,10 +93,13 @@ class CloudIo: def __init__(self, cluster_id: str): self.cluster_id = cluster_id + self.cluster = CloudCluster(cluster_id=cluster_id) - def load_resource(self, resource: InputOutputResource, target: TableAddress) -> t.Tuple[t.Dict, bool]: + def load_resource(self, resource: InputOutputResource, target: TableAddress) -> CloudJob: """ Load resource from URL into CrateDB, using CrateDB Cloud infrastructure. + + TODO: Refactor return value to use dedicated type. """ # Use `schema` and `table` when given, otherwise derive from input URL. @@ -39,86 +109,53 @@ def load_resource(self, resource: InputOutputResource, target: TableAddress) -> logger.info(f"Loading data. source={resource}, target={target}") import_job = self.create_import_job(resource=resource, target=target) - job_id = import_job["id"] + job_id = import_job.id + # TODO: Review this. time.sleep(0.15) - outcome, success, found = self.find_job(job_id=job_id) - if not found: - logger.error(f"Job not found: {job_id}") - if not outcome: - outcome = import_job - return outcome, success - - def find_job(self, job_id: str) -> t.Tuple[t.Dict, bool, bool]: + cloud_job = self.find_job(job_id=job_id) + if not cloud_job.found: + logger.error(cloud_job.message) + if not cloud_job.info: + cloud_job.info = import_job.info + return cloud_job + + def find_job(self, job_id: str) -> CloudJob: """ Find CrateDB Cloud job by identifier. """ - jobs = jobs_list(self.cluster_id) - - found = False - success = False - job_info: t.Dict = {} - for job in jobs: - if job["id"] == job_id: - found = True - job_info = job - status = job["status"] - message = job["progress"]["message"] - message = f"{message} (status: {status})" - if status == "SUCCEEDED": - success = True - logger.info(message) + for job in self.list_jobs(): + if job.id == job_id: + log_message = f"{job.message} (status: {job.status})" + if job.success: + logger.info(log_message) else: - logger.error(message) - break + logger.error(log_message) + return job - fix_job_info_table_name(job_info) + return CloudJob.unknown(message=f"Job was not created: {job_id}") - return job_info, success, found + def list_jobs(self) -> t.Generator[CloudJob, None, None]: + """ + Inquire API for a list of cloud jobs. + """ + for job_dict in self.cluster.list_jobs(): + yield CloudJob(info=job_dict, found=True) - def create_import_job(self, resource: InputOutputResource, target: TableAddress) -> t.Dict[str, t.Any]: + def create_import_job(self, resource: InputOutputResource, target: TableAddress) -> CloudJob: """ Create CrateDB Cloud import job, using resource on filesystem or URL. croud clusters import-jobs create from-url --cluster-id e1e38d92-a650-48f1-8a70-8133f2d5c400 \ --file-format csv --table my_table_name --url https://s3.amazonaws.com/my.import.data.gz --compression gzip """ - from croud.__main__ import import_job_create_common_args - from croud.clusters.commands import import_jobs_create_from_file, import_jobs_create_from_url - from croud.parser import Argument - - specs: t.List[Argument] = import_job_create_common_args - arguments = [] - - url_argument = Argument("--url", type=str, required=True, help="The URL the import file will be read from.") - - file_id_argument = Argument( - "--file-id", - type=str, - required=False, - help="The file ID that will be used for the " - "import. If not specified then --file-path" - " must be specified. " - "Please refer to `croud organizations " - "files` for more info.", - ) - file_path_argument = Argument( - "--file-path", - type=str, - required=False, - help="The file in your local filesystem that " - "will be used. If not specified then " - "--file-id must be specified. " - "Please note the file will become visible " - "under `croud organizations files list`.", - ) # Compute command-line arguments for invoking `croud`. + # FIXME: This call is redundant. path = Path(resource.url) # Honor `schema` argument. - table = target.table if target.schema is not None: - table = f'"{target.schema}"."{target.table}"' + target.table = f'"{target.schema}"."{target.table}"' if resource.compression is None: if ".gz" in path.suffixes or ".gzip" in path.suffixes: @@ -144,53 +181,4 @@ def create_import_job(self, resource: InputOutputResource, target: TableAddress) f"Use one of: {CloudIoSpecs.allowed_formats}" ) - # TODO: Sanitize table name. Which characters are allowed? - if path.exists(): - specs.append(file_path_argument) - specs.append(file_id_argument) - arguments = [ - f"--cluster-id={self.cluster_id}", - f"--file-path={resource.url}", - f"--table={table}", - f"--file-format={resource.format}", - ] - fun = import_jobs_create_from_file - else: - specs.append(url_argument) - arguments = [ - f"--cluster-id={self.cluster_id}", - f"--url={resource.url}", - f"--table={table}", - f"--file-format={resource.format}", - ] - fun = import_jobs_create_from_url - - if resource.compression is not None: - arguments += [f"--compression={resource.compression}"] - - call = CroudCall( - fun=fun, - specs=specs, - arguments=arguments, - ) - - wr = CroudWrapper(call=call) - job_info = wr.invoke() - - # Adjust full-qualified table name by adding appropriate quotes. - # FIXME: Remove after flaw has been fixed. - fix_job_info_table_name(job_info) - - return job_info - - -def fix_job_info_table_name(job_info: t.Dict[str, t.Any]): - # FIXME: Remove after upstream has fixed the flaw. - # Currently, the API returns `testdrive.pems-1`, but that can not be used at all. - # So, converge it into `"testdrive"."pems-1"`. - if "destination" in job_info and "table" in job_info["destination"]: - table = job_info["destination"]["table"] - if '"' not in table and "." in table: - schema, table = table.split(".") - table = f'"{schema}"."{table}"' - job_info["destination"]["table"] = table + return CloudJob(info=self.cluster.create_import_job(resource=resource, target=target)) diff --git a/cratedb_toolkit/job/croud.py b/cratedb_toolkit/job/croud.py index 611477df..6f9ee979 100644 --- a/cratedb_toolkit/job/croud.py +++ b/cratedb_toolkit/job/croud.py @@ -2,6 +2,10 @@ def jobs_list(cratedb_cloud_cluster_id: str, output_format: str = None, decode_output: bool = True): + """ + List jobs in different output format than JSON. + TODO: Can `foobar.list_jobs()` be used instead? + """ from croud.clusters.commands import import_jobs_list from croud.parser import Argument diff --git a/cratedb_toolkit/testing/testcontainers/cratedb.py b/cratedb_toolkit/testing/testcontainers/cratedb.py index 923c1976..a9281582 100644 --- a/cratedb_toolkit/testing/testcontainers/cratedb.py +++ b/cratedb_toolkit/testing/testcontainers/cratedb.py @@ -18,12 +18,12 @@ from testcontainers.core.generic import DbContainer from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs -from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer, asbool +from cratedb_toolkit.testing.testcontainers.util import DockerSkippingContainer, KeepaliveContainer, asbool logger = logging.getLogger(__name__) -class CrateDBContainer(KeepaliveContainer, DbContainer): +class CrateDBContainer(DockerSkippingContainer, KeepaliveContainer, DbContainer): """ CrateDB database container. @@ -34,7 +34,7 @@ class CrateDBContainer(KeepaliveContainer, DbContainer): .. doctest:: - >>> from tests.testcontainers.cratedb import CrateDBContainer + >>> from cratedb_toolkit.testing.testcontainers.cratedb import CrateDBContainer >>> import sqlalchemy >>> cratedb_container = CrateDBContainer("crate:5.2.3") @@ -84,9 +84,11 @@ def _configure(self) -> None: self.with_env("CRATEDB_PASSWORD", self.CRATEDB_PASSWORD) self.with_env("CRATEDB_DB", self.CRATEDB_DB) + @wait_container_is_ready() def get_connection_url(self, host=None) -> str: # TODO: When using `db_name=self.CRATEDB_DB`: # Connection.__init__() got an unexpected keyword argument 'database' + wait_for_logs(self, predicate="o.e.n.Node.*started", timeout=MAX_TRIES) return super()._create_connection_url( dialect=self.dialect, username=self.CRATEDB_USER, diff --git a/cratedb_toolkit/testing/testcontainers/influxdb2.py b/cratedb_toolkit/testing/testcontainers/influxdb2.py index 8bc490bd..734a4fa0 100644 --- a/cratedb_toolkit/testing/testcontainers/influxdb2.py +++ b/cratedb_toolkit/testing/testcontainers/influxdb2.py @@ -17,10 +17,10 @@ from testcontainers.core.generic import DbContainer from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs -from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer +from cratedb_toolkit.testing.testcontainers.util import DockerSkippingContainer, KeepaliveContainer -class InfluxDB2Container(KeepaliveContainer, DbContainer): +class InfluxDB2Container(DockerSkippingContainer, KeepaliveContainer, DbContainer): """ InfluxDB database container. @@ -63,6 +63,7 @@ def _configure(self) -> None: self.with_env("DOCKER_INFLUXDB_INIT_BUCKET", "default") self.with_env("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN", self.TOKEN) + @wait_container_is_ready() def get_connection_url(self, host=None) -> str: return super()._create_connection_url( dialect="http", diff --git a/cratedb_toolkit/testing/testcontainers/minio.py b/cratedb_toolkit/testing/testcontainers/minio.py index 82ce9ff4..5bfa6fc6 100644 --- a/cratedb_toolkit/testing/testcontainers/minio.py +++ b/cratedb_toolkit/testing/testcontainers/minio.py @@ -12,12 +12,13 @@ # under the License. import typing as t +from testcontainers.core.waiting_utils import wait_container_is_ready from testcontainers.minio import MinioContainer -from cratedb_toolkit.testing.testcontainers.util import ExtendedDockerContainer +from cratedb_toolkit.testing.testcontainers.util import DockerSkippingContainer, ExtendedDockerContainer -class ExtendedMinioContainer(ExtendedDockerContainer, MinioContainer): +class ExtendedMinioContainer(DockerSkippingContainer, ExtendedDockerContainer, MinioContainer): """ An extended Testcontainer for MinIO, emulating AWS S3. @@ -38,6 +39,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + @wait_container_is_ready() def list_object_names(self, bucket_name: str) -> t.List[str]: """ Return list of object names within given bucket. diff --git a/cratedb_toolkit/testing/testcontainers/mongodb.py b/cratedb_toolkit/testing/testcontainers/mongodb.py index 5250980a..cb085875 100644 --- a/cratedb_toolkit/testing/testcontainers/mongodb.py +++ b/cratedb_toolkit/testing/testcontainers/mongodb.py @@ -12,12 +12,13 @@ # under the License. import os +from testcontainers.core.waiting_utils import wait_container_is_ready from testcontainers.mongodb import MongoDbContainer -from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer +from cratedb_toolkit.testing.testcontainers.util import DockerSkippingContainer, KeepaliveContainer -class MongoDbContainerWithKeepalive(KeepaliveContainer, MongoDbContainer): +class MongoDbContainerWithKeepalive(DockerSkippingContainer, KeepaliveContainer, MongoDbContainer): """ A Testcontainer for MongoDB with improved configurability. @@ -39,3 +40,7 @@ def __init__( ) -> None: super().__init__(image=image, **kwargs) self.with_name("testcontainers-mongodb") + + @wait_container_is_ready() + def get_connection_url(self): + return super().get_connection_url() diff --git a/cratedb_toolkit/testing/testcontainers/util.py b/cratedb_toolkit/testing/testcontainers/util.py index 523705ff..18c8d3b6 100644 --- a/cratedb_toolkit/testing/testcontainers/util.py +++ b/cratedb_toolkit/testing/testcontainers/util.py @@ -12,8 +12,12 @@ # under the License. import logging import os +from abc import abstractmethod from typing import Any +import pytest +from docker.errors import DockerException +from docker.models.containers import Container from testcontainers.core.container import DockerContainer logger = logging.getLogger(__name__) @@ -71,6 +75,7 @@ def __init__( *args, **kwargs, ) -> None: + # Configure keepalive mechanism. self.keepalive = self.KEEPALIVE if "keepalive" in kwargs: self.keepalive = kwargs["keepalive"] @@ -134,3 +139,71 @@ def stop(self, **kwargs): logger.info("Stopping container") return super().stop() return None + + +class DockerSkippingContainer(DockerContainer): + """ + Testcontainers: Skip test execution when Docker daemon is down. + + It intercepts `DockerException: Connection aborted` errors and converges + them into `pytest.skip()` invocations. + """ + + def __init__(self, *args, **kwargs): + # Set `_container` attribute early, because parent's `__del__` may access it. + self._container: Container = None + try: + super().__init__(*args, **kwargs) + # Detect when Docker daemon is not running. + # FIXME: Synchronize with `PytestTestcontainerAdapter`. + except DockerException as ex: + if "Connection aborted" in str(ex): + # TODO: Make this configurable through some `pytest_` variable. + raise pytest.skip(reason="Skipping test because Docker is not running", allow_module_level=True) from ex + else: # noqa: RET506 + raise + + +class PytestTestcontainerAdapter: + """ + A little helper wrapping Testcontainer's `DockerContainer` for Pytest. + + It intercepts `DockerException: Connection aborted` errors and converges + them into `pytest.skip()` invocations. + + It provides a convention where child classes need to implement the `setup()` + method. When objects of type `DockerContainer` are created within this + method, `DockerException` errors are intercepted, and Pytest is instructed + to skip the corresponding test case. + """ + + def __init__(self): + self.container: DockerContainer = None + self.run_setup() + + @abstractmethod + def setup(self): + raise NotImplementedError("Must be implemented by child class") + + def run_setup(self): + try: + self.setup() + + # Detect when Docker daemon is not running. + # FIXME: Synchronize with `DockerSkippingContainer`. + except DockerException as ex: + if "Connection aborted" in str(ex): + # TODO: Make this configurable through some `pytest_` variable. + raise pytest.skip( + reason="Skipping test because Docker daemon is not available", allow_module_level=True + ) from ex + else: # noqa: RET506 + raise + + self.start() + + def start(self): + self.container.start() + + def stop(self): + self.container.stop() diff --git a/cratedb_toolkit/util/__init__.py b/cratedb_toolkit/util/__init__.py index 82d02010..4b1737b6 100644 --- a/cratedb_toolkit/util/__init__.py +++ b/cratedb_toolkit/util/__init__.py @@ -1,3 +1,4 @@ from cratedb_toolkit.util.cli import boot_with_dburi # noqa: F401 +from cratedb_toolkit.util.common import setup_logging # noqa: F401 from cratedb_toolkit.util.data import jd # noqa: F401 from cratedb_toolkit.util.database import DatabaseAdapter # noqa: F401 diff --git a/cratedb_toolkit/util/basic.py b/cratedb_toolkit/util/basic.py deleted file mode 100644 index 85819038..00000000 --- a/cratedb_toolkit/util/basic.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -import sys - -from dotenv import find_dotenv, load_dotenv - - -def obtain_cluster_id() -> str: - """ - Obtain the CrateDB Cloud Cluster identifier from the environment. - - - Use first positional argument from command line. - - Fall back to `CRATEDB_CLOUD_CLUSTER_ID` environment variable. - """ - load_dotenv(find_dotenv()) - - try: - cluster_id = sys.argv[1] - except IndexError: - cluster_id = os.environ.get("CRATEDB_CLOUD_CLUSTER_ID") - - if not cluster_id: - raise ValueError( - "Unable to obtain cluster identifier from command line or " - "`CRATEDB_CLOUD_CLUSTER_ID` environment variable" - ) - - return cluster_id - - -def obtain_cluster_name() -> str: - """ - Obtain the CrateDB Cloud Cluster name from the environment. - - - Use first positional argument from command line. - - Fall back to `CRATEDB_CLOUD_CLUSTER_NAME` environment variable. - """ - load_dotenv(find_dotenv()) - - try: - cluster_id = sys.argv[1] - except IndexError: - cluster_id = os.environ.get("CRATEDB_CLOUD_CLUSTER_NAME") - - if not cluster_id: - raise ValueError( - "Unable to obtain cluster name from command line or " "`CRATEDB_CLOUD_CLUSTER_NAME` environment variable" - ) - - return cluster_id diff --git a/cratedb_toolkit/util/common.py b/cratedb_toolkit/util/common.py index 1aad601c..4cd013b3 100644 --- a/cratedb_toolkit/util/common.py +++ b/cratedb_toolkit/util/common.py @@ -1,12 +1,16 @@ # Copyright (c) 2023, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging +import os import colorlog from colorlog.escape_codes import escape_codes def setup_logging(level=logging.INFO, verbose: bool = False): + if os.environ.get("DEBUG"): + level = logging.DEBUG + reset = escape_codes["reset"] log_format = f"%(asctime)-15s [%(name)-36s] %(log_color)s%(levelname)-8s:{reset} %(message)s" @@ -27,3 +31,7 @@ def setup_logging(level=logging.INFO, verbose: bool = False): # Tame Faker spamming the logs. # https://github.com/joke2k/faker/issues/753#issuecomment-491402018 logging.getLogger("faker").setLevel(logging.ERROR) + + +def croak(message: str = "Foo."): + raise ValueError(message) diff --git a/cratedb_toolkit/util/croud.py b/cratedb_toolkit/util/croud.py index 8cb55108..26cb9ffe 100644 --- a/cratedb_toolkit/util/croud.py +++ b/cratedb_toolkit/util/croud.py @@ -142,21 +142,28 @@ def run_croud_fun(fun: t.Callable, with_exceptions: bool = True): def print_fun(levelname: str, *args, **kwargs): level = get_sane_log_level(levelname) message = str(args[0]) - logger.log(level, message) + + # Forward/propagate/emit log message from `croud`, or not. + # Variant 1: Forward original log message 1:1. + # logger.log(level, message) # noqa: ERA001 + # Variant 2: Augment log message: Converge to `DEBUG` level. + logger.log(logging.DEBUG, f"[croud] {levelname.upper():<8}: {message}") + # TODO: Variant 3: Use setting in `EnvironmentConfiguration` to turn forwarding on/off. + if with_exceptions and level >= logging.ERROR: raise CroudException(message) - # Patch all `print_*` functions, and invoke_foo workhorse function. + # Patch all `print_*` functions, and invoke workhorse function. # https://stackoverflow.com/a/46481946 levels = ["debug", "info", "warning", "error", "success"] with contextlib.ExitStack() as stack: for level in levels: p = patch(f"croud.printer.print_{level}", functools.partial(print_fun, level)) stack.enter_context(p) + # TODO: When aiming to disable wait-for-completion. """ - TODO: When aiming to disable wait-for-completion. - p = patch(f"croud.clusters.commands._wait_for_completed_operation") - stack.enter_context(p) + p = patch(f"croud.clusters.commands._wait_for_completed_operation") + stack.enter_context(p) """ return fun() diff --git a/cratedb_toolkit/util/runtime.py b/cratedb_toolkit/util/runtime.py new file mode 100644 index 00000000..a06b1106 --- /dev/null +++ b/cratedb_toolkit/util/runtime.py @@ -0,0 +1,70 @@ +import functools +import logging +import sys + +from cratedb_toolkit.config import CONFIG + +logger = logging.getLogger() + + +def flexfun(domain: str = None): + """ + Function decorator, which honors toolkit environment settings wrt. error handling. + + It is sorting out whether to raise exceptions, or whether to just `exit(1)`. + + This detail important to handle well depending on the runtime environment. It can + either be a standalone program, used on behalf of a library, or within a Jupyter + Notebook. + + Regarding the exit code, let's just select one of 1 or 2. + https://www.gnu.org/software/wget/manual/html_node/Exit-Status.html + + 0 + + No problems occurred. + 1 + + Generic error code. + 2 + + Parse error—for instance, when parsing command-line options, the ‘.wgetrc’ or ‘.netrc’... + + -- https://www.pythontutorial.net/advanced-python/python-decorator-arguments/ + """ + + runtime_error_exit_code = 1 + settings_error_exit_code = 2 + + def decorate(fn): + @functools.wraps(fn) + def wrapper(*args, **kwargs): + try: + return fn(*args, **kwargs) + except Exception as ex: + if domain == "runtime": + if CONFIG.runtime_errors == "raise": + raise + elif CONFIG.runtime_errors == "exit": # noqa: RET506 + sys.exit(runtime_error_exit_code) + else: + raise NotImplementedError( + f"Unknown way to handle runtime errors: {CONFIG.runtime_errors}" + ) from ex + elif domain == "settings": + if CONFIG.settings_errors == "raise": + raise + elif CONFIG.settings_errors == "exit": # noqa: RET506 + logger.error(ex) + sys.exit(settings_error_exit_code) + else: + raise NotImplementedError( + f"Unknown way to handle settings errors: {CONFIG.runtime_errors}" + ) from ex + else: + logger.debug(f"Not suppressing exception on unknown domain: {domain}") + raise + + return wrapper + + return decorate diff --git a/cratedb_toolkit/util/setting.py b/cratedb_toolkit/util/setting.py new file mode 100644 index 00000000..cbbe2640 --- /dev/null +++ b/cratedb_toolkit/util/setting.py @@ -0,0 +1,154 @@ +import argparse +import dataclasses +import logging +import os +import sys +import typing as t + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class Setting: + name: str + envvar: str + help: str # noqa: A003 + + def asdict(self): + return dataclasses.asdict(self) + + +def obtain_setting( + pos: int = None, + name: str = None, + default: t.Any = None, + help: str = None, # noqa: A002 + envvar: str = None, + envprefix: str = None, + errors: str = "raise", + error_message: str = None, + use_dotenv: bool = True, +) -> t.Optional[str]: + """ + Obtain configuration setting from different sources, DWIM-style. + This is the generic workhorse utility variant. + + - Command line argument, in long form. Example: `--foobar=bazqux`. + - Positional argument on command line. Example: `bazqux`. + - Environment variable. Example: `export FOOBAR=bazqux`. + - Environment variable prefix. Example: `export APPNAME_FOOBAR=bazqux`. + """ + + # 0. Specials. + # When no env_name is given, but an env_prefix *and* a CLI arg name are given, + # make up an environment variable name like `--foo` => `APP_FOO`. + if envvar is None and envprefix is not None and name: + if not isinstance(envprefix, str) or not isinstance(name, str): + raise TypeError("Wrong type for arguments `env_prefix` or `cli_argument_name`") + envvar = envprefix.upper() + name.strip("-").replace("-", "_").upper() + + # Optionally collect an error message about the decoding step. + decoding_error = None + + # 1. Decode named argument. + if name is not None: + parser, value = obtain_setting_cli(name=name, default=default, help=help) + if value is not None: + return value + + # 2. Decode positional argument. + if pos is not None and not argv_has_long_option(): + try: + return sys.argv[pos] + except IndexError as ex: + decoding_error = f"{ex.__class__.__name__}: {ex}" + + # 3. Decode environment variable. + if envvar is not None: + if use_dotenv: + from dotenv import find_dotenv, load_dotenv + + load_dotenv(find_dotenv()) + try: + return os.environ[envvar] + except KeyError as ex: + decoding_error = f"{ex.__class__.__name__}: {ex}" + + # 4. Prepare error reporting. + if error_message is None: + error_message = ( + f"Unable to obtain configuration setting from " + f"command line argument `{name}`, at " + f"argv position `{pos}`, or through the " + f"environment variable `{envvar}`: {decoding_error}" + ) + + # 5. Report about decoding errors, or propagate them as exception. + if errors != "ignore": + logger.warning(error_message) + + # When the ArgumentParser is involved, use its help message. + if name: + logger.warning(f"Unable to decode command line options, see usage information:\n{parser.format_help()}") + + # By default, raise an exception when no value could be obtained. + if errors == "raise": # noqa: RET503 + raise ValueError(error_message) + + return None + + +def obtain_setting_cli( + name: str, default: str = None, help: str = None # noqa: A002 +) -> t.Tuple[argparse.ArgumentParser, t.Optional[str]]: + """ + Obtain a command line argument value from `sys.argv`. + """ + parser = argparse.ArgumentParser(exit_on_error=False) + arg = parser.add_argument(name, default=default, help=help) + namespace, args = parser.parse_known_args() + return parser, getattr(namespace, arg.dest) + + +def argv_has_long_option() -> bool: + """ + Whether the command line contains any "long options" argument. + """ + return any("--" in arg for arg in sys.argv[1:]) + + +class RequiredMutuallyExclusiveSettingsGroup: + """ + Obtain configuration settings from different sources, DWIM-style. + This variant provides grouping, in terms of mutual exclusiveness of multiple settings. + + It has been inspired by click-option-group's RequiredMutuallyExclusiveOptionGroup. + https://github.com/click-contrib/click-option-group + """ + + def __init__(self, *settings, message_none: str = None, message_multiple: str = None): + self.settings = settings + self.message_multiple = message_multiple + self.message_none = message_none + + def obtain_settings(self): + names = [] + envvars = [] + values = [] + for setting in self.settings: + names.append(setting.name) + envvars.append(setting.envvar) + value = obtain_setting(**setting.asdict(), errors="ignore") + values.append(value) + guidance = f"Use one of the CLI argument {names} or environment variable {envvars}" + if all(value is None for value in values): + message = self.message_none + if message is None: + message = f"One of the settings is required, but none of them have been specified. {guidance}" + raise ValueError(message) + if values.count(None) < len(self.settings) - 1: + message = self.message_multiple + if message is None: + message = f"The settings are mutually exclusive, but multiple of them have been specified. {guidance}" + raise ValueError(message) + return values diff --git a/examples/cloud_cluster.py b/examples/cloud_cluster.py index 3f0ddf2d..9c65eff4 100644 --- a/examples/cloud_cluster.py +++ b/examples/cloud_cluster.py @@ -1,93 +1,113 @@ # Copyright (c) 2023, Crate.io Inc. -# Distributed under the terms of the AGPLv3 license, see LICENSE. +# Distributed under the terms of the Apache 2 license. """ About ===== -Example program demonstrating how to manage a CrateDB Cloud -database cluster. - -The program assumes you are appropriately authenticated to -CrateDB Cloud, for example using `croud login --idp azuread`. - -The program obtains a single positional argument from the command line, -the CrateDB Cloud Cluster name. When omitted, it will fall back -to the `CRATEDB_CLOUD_CLUSTER_NAME` environment variable. +Example program demonstrating how to manage a CrateDB Cloud database cluster. +It obtains a database cluster identifier or name, connects to the database +cluster, optionally deploys it, and runs an example workload. Synopsis ======== :: - # Install package. + # Install utility package. pip install 'cratedb-toolkit' # Log in to CrateDB Cloud. croud login --idp azuread - # Deploy a cluster instance. - python examples/cloud_cluster.py Hotzenplotz + # Initialize a cluster instance. + python examples/cloud_cluster.py --cluster-name Hotzenplotz # Inquire list of available clusters. croud clusters list - # Run an SQL command. + # Run an example SQL command. + # TODO: Store per-cluster username and password in configuration file / system keyring. export CRATEDB_CLOUD_CLUSTER_ID='e1e38d92-a650-48f1-8a70-8133f2d5c400' export CRATEDB_USERNAME='admin' export CRATEDB_PASSWORD='H3IgNXNvQBJM3CiElOiVHuSp6CjXMCiQYhB4I9dLccVHGvvvitPSYr1vTpt4' ctk shell --command "SELECT * from sys.summits LIMIT 5;" -References -========== +Usage +===== + +The program assumes you are appropriately authenticated to the CrateDB Cloud +platform, for example using `croud login --idp azuread`. + +For addressing a database cluster, the program obtains the cluster identifier +or name from the user's environment, using command-line arguments or environment +variables. + +Configuration +============= + +The configuration settings can be specified as CLI arguments:: + + --cluster-id=e1e38d92-a650-48f1-8a70-8133f2d5c400 + --cluster-name=Hotzenplotz + +Alternatively, you can use environment variables:: + + export CRATEDB_CLOUD_CLUSTER_ID=e1e38d92-a650-48f1-8a70-8133f2d5c400 + export CRATEDB_CLOUD_CLUSTER_NAME=Hotzenplotz + +Command line arguments take precedence. When omitted, the program will +fall back to probe the environment variables. -- https://github.com/crate/jenkins-dsl/blob/master/scripts/croud-deploy-nightly.sh -- https://github.com/coiled/examples/blob/main/geospatial.ipynb """ import json import logging -import sys -from cratedb_toolkit.util.basic import obtain_cluster_name -from cratedb_toolkit.util.common import setup_logging +import cratedb_toolkit +from cratedb_toolkit.util import setup_logging logger = logging.getLogger(__name__) -def workload(cluster_name: str): +def workload(): """ Run a workload on a CrateDB database cluster on CrateDB Cloud. - ctk deploy cluster Hotzenplotz + ctk cluster start Hotzenplotz ctk shell --command "SELECT * from sys.summits LIMIT 5;" """ - # Database cluster resource handle. - from cratedb_toolkit.api import ManagedCluster + from cratedb_toolkit import ManagedCluster - # Acquire database cluster. - cluster = ManagedCluster(name=cluster_name) - cluster.acquire() + # Acquire database cluster handle, obtaining cluster identifier + # or name from the user's environment. + cluster = ManagedCluster.from_env().start() # Report information about cluster. - print(json.dumps(cluster.info.cloud)) + # TODO: Use `cluster.{print,format}_info()`. + print(json.dumps(cluster.info.cloud)) # noqa: T201 # Run database workload. + # TODO: Enable acquiring a client handle. # cratedb = cluster.get_connection_client() # noqa: ERA001 # cratedb.run_sql("SELECT * from sys.summits LIMIT 5;") # noqa: ERA001 + # Stop cluster again. + cluster.stop() -def main(): - """ - Obtain cluster name and SQL command, and run program. - """ - try: - cluster_name = obtain_cluster_name() - except ValueError as ex: - logger.error(ex) - sys.exit(1) - workload(cluster_name) +def main(): + workload() if __name__ == "__main__": + # Configure toolkit environment to be suitable for a CLI application, with + # interactive guidance, and accepting configuration settings from the environment. setup_logging() + cratedb_toolkit.configure( + runtime_errors="exit", + settings_accept_cli=True, + settings_accept_env=True, + settings_errors="exit", + ) + + # Run program. main() diff --git a/examples/cloud_import.py b/examples/cloud_import.py index bfefafdd..42759709 100644 --- a/examples/cloud_import.py +++ b/examples/cloud_import.py @@ -1,5 +1,5 @@ # Copyright (c) 2023, Crate.io Inc. -# Distributed under the terms of the AGPLv3 license, see LICENSE. +# Distributed under the terms of the Apache 2 license. """ About ===== @@ -11,15 +11,11 @@ gzip compression. They can be acquired from the local filesystem, or from remote HTTP and AWS S3 resources. -The program obtains a single positional argument from the command line, -the CrateDB Cloud Cluster identifier. When omitted, it will fall back -to the `CRATEDB_CLOUD_CLUSTER_ID` environment variable. - Synopsis ======== :: - # Install package. + # Install utility package. pip install 'cratedb-toolkit' # Log in to CrateDB Cloud. @@ -28,77 +24,104 @@ # Inquire list of available clusters. croud clusters list - # Invoke import of CSV and Parquet files. - python examples/cloud_import.py e1e38d92-a650-48f1-8a70-8133f2d5c400 + # Invoke example import of CSV and Parquet files. + python examples/cloud_import.py --cluster-id e1e38d92-a650-48f1-8a70-8133f2d5c400 + +Usage +===== + +The program assumes you are appropriately authenticated to the CrateDB Cloud +platform, for example using `croud login --idp azuread`. + +For addressing a database cluster, the program obtains the cluster identifier +or name from the user's environment, using command-line arguments or environment +variables. + +Configuration +============= + +The configuration settings can be specified as CLI arguments:: + + --cluster-id=e1e38d92-a650-48f1-8a70-8133f2d5c400 + --cluster-name=Hotzenplotz + +Alternatively, you can use environment variables:: + + export CRATEDB_CLOUD_CLUSTER_ID=e1e38d92-a650-48f1-8a70-8133f2d5c400 + export CRATEDB_CLOUD_CLUSTER_NAME=Hotzenplotz + +Command line arguments take precedence. When omitted, the program will +fall back to probe the environment variables. """ import logging -import sys -from cratedb_toolkit.api.main import ManagedCluster -from cratedb_toolkit.model import InputOutputResource, TableAddress -from cratedb_toolkit.util.basic import obtain_cluster_id -from cratedb_toolkit.util.common import setup_logging +import cratedb_toolkit +from cratedb_toolkit import InputOutputResource, ManagedCluster, TableAddress +from cratedb_toolkit.util import setup_logging logger = logging.getLogger(__name__) -def import_csv(cluster_id: str): +def import_csv(): """ Import CSV file from HTTP, derive table name from file name. - ctk shell --command 'SELECT * FROM "nab-machine-failure" LIMIT 10;' + A corresponding command to query the imported data is:: + + ctk shell --command 'SELECT * FROM "nab-machine-failure" LIMIT 10;' """ - # Acquire database cluster resource handle. - cluster = ManagedCluster(cloud_id=cluster_id) + # Acquire database cluster handle, obtaining cluster identifier + # or name from the user's environment. + cluster = ManagedCluster.from_env().start() - # Encapsulate source parameter. + # Define data source. url = "https://github.com/crate/cratedb-datasets/raw/main/machine-learning/timeseries/nab-machine-failure.csv" - resource = InputOutputResource(url=url) + source = InputOutputResource(url=url) # Invoke import job. Without `target` argument, the destination # table name will be derived from the input file name. - response, success = cluster.load_table(resource=resource) - if not success: - sys.exit(1) + cluster.load_table(source=source) -def import_parquet(cluster_id: str): +def import_parquet(): """ - Import Parquet file from HTTP, and use specific schema and table names. + Import Parquet file from HTTP, use specific schema and table names. - ctk shell --command 'SELECT * FROM "testdrive"."yc-201907" LIMIT 10;' + A corresponding command to query the imported data is:: + + ctk shell --command 'SELECT * FROM "testdrive"."yc-201907" LIMIT 10;' """ - # Acquire database cluster resource handle. - cluster = ManagedCluster(cloud_id=cluster_id) + # Acquire database cluster handle, obtaining cluster identifier + # or name from the user's environment. + cluster = ManagedCluster.from_env().start() - # Encapsulate source and target parameters. + # Define data source and target table. url = "https://github.com/crate/cratedb-datasets/raw/main/timeseries/yc.2019.07-tiny.parquet.gz" - resource = InputOutputResource(url=url) + source = InputOutputResource(url=url) target = TableAddress(schema="testdrive", table="yc-201907") # Invoke import job. The destination table name is explicitly specified. - response, success = cluster.load_table(resource=resource, target=target) - if not success: - sys.exit(1) + cluster.load_table(source=source, target=target) def main(): - """ - Obtain cluster identifier, and run program. - """ - try: - cluster_id = obtain_cluster_id() - except ValueError as ex: - logger.error(ex) - sys.exit(1) - - import_csv(cluster_id) - import_parquet(cluster_id) + import_csv() + import_parquet() if __name__ == "__main__": + # Configure toolkit environment to be suitable for a CLI application, with + # interactive guidance, and accepting configuration settings from the environment. setup_logging() + cratedb_toolkit.configure( + runtime_errors="exit", + settings_accept_cli=True, + settings_accept_env=True, + settings_errors="exit", + ) + + # Run program. main() diff --git a/pyproject.toml b/pyproject.toml index 2d5d4be9..cf560d8b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -243,10 +243,9 @@ extend-exclude = [ ] [tool.ruff.per-file-ignores] -"tests/*" = ["S101"] # Allow use of `assert`, and `print`. -"examples/*" = ["T201"] # Allow `print` -"cratedb_toolkit/retention/cli.py" = ["T201"] # Allow `print` -"cratedb_toolkit/sqlalchemy/__init__.py" = ["F401"] # Allow `module´ imported but unused +"tests/*" = ["S101"] # Allow use of `assert`. +"cratedb_toolkit/retention/cli.py" = ["T201"] # Allow use of `print`. +"cratedb_toolkit/sqlalchemy/__init__.py" = ["F401"] # Allow "module imported but unused". [tool.setuptools.packages.find] namespaces = false diff --git a/tests/conftest.py b/tests/conftest.py index c2f45124..957dc94b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,7 @@ import pytest import responses -from cratedb_toolkit.testing.testcontainers.cratedb import CrateDBContainer +from cratedb_toolkit.testing.testcontainers.util import PytestTestcontainerAdapter from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.common import setup_logging @@ -26,33 +26,32 @@ ] -class CrateDBFixture: +class CrateDBFixture(PytestTestcontainerAdapter): """ A little helper wrapping Testcontainer's `CrateDBContainer` and CrateDB Toolkit's `DatabaseAdapter`, agnostic of the test framework. """ def __init__(self): - self.cratedb = None + self.container = None self.database: DatabaseAdapter = None - self.setup() + super().__init__() def setup(self): + from cratedb_toolkit.testing.testcontainers.cratedb import CrateDBContainer + # TODO: Make image name configurable. - self.cratedb = CrateDBContainer("crate/crate:nightly") - self.cratedb.start() + self.container = CrateDBContainer("crate/crate:nightly") + self.container.start() self.database = DatabaseAdapter(dburi=self.get_connection_url()) - def finalize(self): - self.cratedb.stop() - def reset(self): # TODO: Make list of tables configurable. for reset_table in RESET_TABLES: self.database.connection.exec_driver_sql(f"DROP TABLE IF EXISTS {reset_table};") def get_connection_url(self, *args, **kwargs): - return self.cratedb.get_connection_url(*args, **kwargs) + return self.container.get_connection_url(*args, **kwargs) @pytest.fixture(scope="session", autouse=True) @@ -63,7 +62,7 @@ def configure_database_schema(session_mocker): If not configured otherwise, the test suite currently uses `testdrive-ext`. """ - session_mocker.patch("os.environ", {"CRATEDB_EXT_SCHEMA": TESTDRIVE_EXT_SCHEMA}) + session_mocker.patch.dict("os.environ", {"CRATEDB_EXT_SCHEMA": TESTDRIVE_EXT_SCHEMA}) @pytest.fixture(scope="session") @@ -74,7 +73,7 @@ def cratedb_service(): db = CrateDBFixture() db.reset() yield db - db.finalize() + db.stop() @pytest.fixture(scope="function") @@ -92,7 +91,11 @@ def cloud_cluster_mock(): responses.Response( method="GET", url="https://console.cratedb.cloud/api/v2/clusters/e1e38d92-a650-48f1-8a70-8133f2d5c400/", - json={"url": "https://testdrive.example.org:4200/", "project_id": "3b6b7c82-d0ab-458c-ae6f-88f8346765ee"}, + json={ + "url": "https://testdrive.example.org:4200/", + "project_id": "3b6b7c82-d0ab-458c-ae6f-88f8346765ee", + "name": "testcluster", + }, ) ) responses.add( diff --git a/tests/io/influxdb/conftest.py b/tests/io/influxdb/conftest.py index cdbeb89c..5f86f68c 100644 --- a/tests/io/influxdb/conftest.py +++ b/tests/io/influxdb/conftest.py @@ -2,6 +2,8 @@ import pytest +from cratedb_toolkit.testing.testcontainers.util import PytestTestcontainerAdapter + logger = logging.getLogger(__name__) @@ -11,7 +13,7 @@ ] -class InfluxDB2Fixture: +class InfluxDB2Fixture(PytestTestcontainerAdapter): """ A little helper wrapping Testcontainer's `InfluxDB2Container`. """ @@ -21,7 +23,7 @@ def __init__(self): self.container = None self.client: InfluxDBClient = None - self.setup() + super().__init__() def setup(self): # TODO: Make image name configurable. @@ -31,9 +33,6 @@ def setup(self): self.container.start() self.client = self.container.get_connection_client() - def finalize(self): - self.container.stop() - def reset(self): """ Delete all buckets used for testing. @@ -55,7 +54,7 @@ def influxdb_service(): db = InfluxDB2Fixture() db.reset() yield db - db.finalize() + db.stop() @pytest.fixture(scope="function") diff --git a/tests/io/mongodb/conftest.py b/tests/io/mongodb/conftest.py index 5ff938eb..15b74c40 100644 --- a/tests/io/mongodb/conftest.py +++ b/tests/io/mongodb/conftest.py @@ -2,6 +2,8 @@ import pytest +from cratedb_toolkit.testing.testcontainers.util import PytestTestcontainerAdapter + logger = logging.getLogger(__name__) @@ -11,7 +13,7 @@ ] -class MongoDBFixture: +class MongoDBFixture(PytestTestcontainerAdapter): """ A little helper wrapping Testcontainer's `MongoDbContainer`. """ @@ -21,7 +23,7 @@ def __init__(self): self.container = None self.client: MongoClient = None - self.setup() + super().__init__() def setup(self): # TODO: Make image name configurable. @@ -31,9 +33,6 @@ def setup(self): self.container.start() self.client = self.container.get_connection_client() - def finalize(self): - self.container.stop() - def reset(self): """ Drop all databases used for testing. @@ -56,7 +55,7 @@ def mongodb_service(): db = MongoDBFixture() db.reset() yield db - db.finalize() + db.stop() @pytest.fixture(scope="function") diff --git a/tests/io/test_import.py b/tests/io/test_import.py index 9eeaa3d3..c3354533 100644 --- a/tests/io/test_import.py +++ b/tests/io/test_import.py @@ -50,7 +50,7 @@ def test_import_csv_dask_with_progressbar(cratedb, dummy_csv): assert result == [(2,)] -@pytest.mark.skip("Does not work. Why?") +@pytest.mark.skip("Does not work. Q: Why? A: Response mocking? Q: And now? A: Just patch the low-level functions!") @responses.activate def test_import_cloud_file(tmp_path, caplog, cloud_cluster_mock): """ diff --git a/tests/retention/test_cli.py b/tests/retention/test_cli.py index b100e50a..df52f65c 100644 --- a/tests/retention/test_cli.py +++ b/tests/retention/test_cli.py @@ -61,7 +61,8 @@ def test_setup_verbose(caplog, cratedb, settings): assert result.exit_code == 0 assert cratedb.database.table_exists(settings.policy_table.fullname) is True - assert 3 <= len(caplog.records) <= 10 + # TODO: Validate a few log messages, instead of counting them. + assert 3 <= len(caplog.records) <= 15 def test_setup_dryrun(caplog, cratedb, settings): @@ -105,7 +106,7 @@ def test_setup_failure_envvar_invalid_dburi(mocker): program fails correctly, when pointing it to an arbitrary address. """ - mocker.patch("os.environ", {"CRATEDB_URI": "crate://localhost:5555"}) + mocker.patch.dict("os.environ", {"CRATEDB_URI": "crate://localhost:5555"}) runner = CliRunner() with pytest.raises(OperationalError) as ex: @@ -367,7 +368,7 @@ def test_run_snapshot_fs(caplog, cratedb, store, database, sensor_readings, sens """ # Acquire OCI container handle, to introspect it. - tc_container: DockerContainer = cratedb.cratedb + tc_container: DockerContainer = cratedb.container oci_container: Container = tc_container.get_wrapped_container() # Define snapshot directory. diff --git a/tests/retention/test_examples.py b/tests/retention/test_examples.py index 95d2be8c..7ce8ad6e 100644 --- a/tests/retention/test_examples.py +++ b/tests/retention/test_examples.py @@ -1,9 +1,9 @@ # Copyright (c) 2023, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. -from unittest.mock import patch - import responses +import cratedb_toolkit + def test_example_edit(store): """ @@ -24,13 +24,18 @@ def test_example_retire_cutoff(store): @responses.activate -def test_example_cloud_import(store, cloud_cluster_mock): +def test_example_cloud_import(mocker, store, cloud_cluster_mock): """ Verify that the program `examples/cloud_import.py` works. """ - from examples.cloud_import import main + cratedb_toolkit.configure( + settings_accept_cli=True, + settings_accept_env=True, + ) cluster_id = "e1e38d92-a650-48f1-8a70-8133f2d5c400" - with patch("examples.cloud_import.obtain_cluster_id", return_value=cluster_id): - main() + mocker.patch.dict("os.environ", {"CRATEDB_CLOUD_CLUSTER_ID": cluster_id}) + from examples.cloud_import import main + + main()