Skip to content

Commit

Permalink
Cloud API: Naming things, improved UX, more OO
Browse files Browse the repository at this point in the history
- Naming things: Use more appropriate names for variables.
- OO: Add `EnvironmentConfiguration`, to manage information about the
  toolkit environment.
- OO: Add `CloudJob` entity, to manage information about a cloud job,
  and to streamline passing of information.
- OO: Refactor low-level methods to `CloudCluster`.
- Capability to configure error handling based on the environment.
- Capability to conveniently acquire configuration settings from the
  environment, for obtaining the CrateDB Cluster identifier or name.
- Tests: Add `DockerSkippingContainer` and `PytestTestcontainerAdapter`.
  Both skip test execution when the Docker daemon is not running, or not
  available to the environment.
- Examples: Improve UX of `cloud_*.py` example programs.
  • Loading branch information
amotl committed Nov 15, 2023
1 parent f55f9ac commit a900403
Show file tree
Hide file tree
Showing 27 changed files with 881 additions and 351 deletions.
4 changes: 4 additions & 0 deletions cratedb_toolkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
__version__ = version(__appname__)
except PackageNotFoundError: # pragma: no cover
__version__ = "unknown"

from .api import ManagedCluster # noqa: F401
from .config import configure # noqa: F401
from .model import InputOutputResource, TableAddress # noqa: F401
181 changes: 132 additions & 49 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,166 @@
import dataclasses
import json
import logging
import sys
import typing as t
from abc import abstractmethod

from cratedb_toolkit.api.guide import GuidingTexts
from cratedb_toolkit.cluster.util import deploy_cluster, get_cluster_by_name, get_cluster_info
from cratedb_toolkit.config import CONFIG
from cratedb_toolkit.exception import CroudException, OperationFailed
from cratedb_toolkit.io.croud import CloudIo
from cratedb_toolkit.io.croud import CloudIo, CloudJob
from cratedb_toolkit.model import ClusterInformation, DatabaseAddress, InputOutputResource, TableAddress
from cratedb_toolkit.util.runtime import flexfun
from cratedb_toolkit.util.setting import RequiredMutuallyExclusiveSettingsGroup, Setting

logger = logging.getLogger(__name__)


class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, source: InputOutputResource, target: TableAddress):
raise NotImplementedError("Child class needs to implement this method")


@dataclasses.dataclass
class ManagedCluster(ClusterBase):
"""
Wrap a managed CrateDB database cluster on CrateDB Cloud.
"""

cloud_id: t.Optional[str] = None
name: t.Optional[str] = None
address: t.Optional[DatabaseAddress] = None
info: t.Optional[ClusterInformation] = None
exists: bool = False
settings_spec = RequiredMutuallyExclusiveSettingsGroup(
Setting(
name="--cluster-id",
envvar="CRATEDB_CLOUD_CLUSTER_ID",
help="The CrateDB Cloud cluster identifier, an UUID",
),
Setting(
name="--cluster-name",
envvar="CRATEDB_CLOUD_CLUSTER_NAME",
help="The CrateDB Cloud cluster name",
),
)

def __init__(
self,
id: str = None, # noqa: A002
name: str = None,
address: DatabaseAddress = None,
info: ClusterInformation = None,
):
self.id = id
self.name = name
self.address = address
self.info: ClusterInformation = info or ClusterInformation()
self.exists: bool = False
if self.id is None and self.name is None:
raise ValueError("Failed to address cluster: Either cluster identifier or name needs to be specified")

@classmethod
@flexfun(domain="settings")
def from_env(cls) -> "ManagedCluster":
"""
Obtain CrateDB Cloud cluster identifier or name from user environment.
The settings are mutually exclusive.
def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id} ({self.name})")
self.probe()
When the toolkit environment is configured with `settings_accept_cli`,
the settings can be specified that way:
--cluster-id=e1e38d92-a650-48f1-8a70-8133f2d5c400
--cluster-name=Hotzenplotz
When the toolkit environment is configured with `settings_accept_env`,
the settings can be specified that way:
export CRATEDB_CLOUD_CLUSTER_ID=e1e38d92-a650-48f1-8a70-8133f2d5c400
export CRATEDB_CLOUD_CLUSTER_NAME=Hotzenplotz
"""
if not CONFIG.settings_accept_cli or not CONFIG.settings_accept_env:
raise ValueError(
"Unable to obtain cluster identifier or name without accepting settings from user environment"
)
try:
cluster_id, cluster_name = cls.settings_spec.obtain_settings()
return cls(id=cluster_id, name=cluster_name)

# TODO: With `flexfun`, can this section be improved?
except ValueError as ex:
logger.error(f"Failed to address cluster: {ex}")
if CONFIG.settings_errors == "exit":
sys.exit(1)
else:
raise

def stop(self) -> "ManagedCluster":
logger.warning("Stopping cluster not implemented yet")
return self

def delete(self) -> "ManagedCluster":
return self

def probe(self):
def probe(self) -> "ManagedCluster":
"""
Probe a CrateDB Cloud cluster, API-wise.
"""
if not self.cloud_id and not self.name:
self.exists = False
raise ValueError("Either cluster identifier or name needs to be specified")
try:
if self.cloud_id:
self.info = get_cluster_info(cluster_id=self.cloud_id)
if self.id:
self.info = get_cluster_info(cluster_id=self.id)
self.name = self.info.cloud["name"]
else:
elif self.name:
self.info = get_cluster_by_name(self.name)
self.cloud_id = self.info.cloud["id"]
self.id = self.info.cloud["id"]
else:
self.exists = False
raise ValueError("Failed to address cluster: Either cluster identifier or name needs to be specified")
except CroudException as ex:
self.exists = False
if "Cluster not found" not in str(ex):
raise
if self.info:
if self.info.cloud:
self.exists = True
logger.info(f"Cluster information: name={self.info.cloud.get('name')}, url={self.info.cloud.get('url')}")
return self

def acquire(self):
@flexfun(domain="runtime")
def start(self) -> "ManagedCluster":
"""
Start a database cluster.
When cluster does not exist, acquire/deploy it.
"""
logger.info(f"Deploying/starting/resuming CrateDB Cloud Cluster: id={self.id}, name={self.name}")
self.acquire()
return self

def acquire(self) -> "ManagedCluster":
"""
Acquire a database cluster.
When cluster does not exist, deploy it.
This means going through the steps of deploy and/or start, as applicable.
- When cluster does not exist, create/deploy it.
- When a cluster exists, but is stopped/hibernated, start/resume it.
"""
self.probe()
if not self.exists:
logger.info(f"Cluster does not exist: {self.name}")
logger.info(f"Deploying cluster: {self.name}")
if self.deploy():
self.probe()
else:
logger.info(f"Cluster does not exist, deploying it: id={self.id}, name={self.name}")
self.deploy()
self.probe()
if not self.exists:
# TODO: Is it possible to gather and propagate more information why the deployment failed?
raise CroudException(f"Deployment of cluster failed: {self.name}")
return self

def deploy(self):
def deploy(self) -> "ManagedCluster":
"""
Run the cluster deployment procedure.
"""
try:
deploy_cluster(self.name)
except CroudException:
return False
return True

def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None):
# FIXME: Accept id or name.
if self.name is None:
raise ValueError("Need cluster name to deploy")
deploy_cluster(self.name)
return self

@flexfun(domain="runtime")
def load_table(self, source: InputOutputResource, target: t.Optional[TableAddress] = None) -> CloudJob:
"""
Load data into a database table on CrateDB Cloud.
Expand All @@ -93,33 +173,36 @@ def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddr
https://console.cratedb.cloud
"""

self.probe()
target = target or TableAddress()

# FIXME: Accept id or name.
if self.id is None:
raise ValueError("Need cluster identifier to load table")

try:
cio = CloudIo(cluster_id=self.cloud_id)
cio = CloudIo(cluster_id=self.id)
except CroudException as ex:
msg = f"Connecting to cluster resource failed: {self.cloud_id}. Reason: {ex}"
if "Resource not found" in str(ex):
logger.error(msg)
return None, False
msg = f"Connecting to cluster resource failed: {self.id}. Reason: {ex}"
logger.exception(msg)
raise OperationFailed(msg) from ex

try:
job_info, success = cio.load_resource(resource=resource, target=target)
logger.info("Job information:\n%s", json.dumps(job_info, indent=2))
cloud_job = cio.load_resource(resource=source, target=target)
logger.info("Job information:\n%s", json.dumps(cloud_job.info, indent=2))
# TODO: Explicitly report about `failed_records`, etc.
texts = GuidingTexts(
admin_url=self.info.cloud["url"],
table_name=job_info["destination"]["table"],
table_name=cloud_job.info["destination"]["table"],
)
if success:
if cloud_job.success:
logger.info("Data loading was successful: %s", texts.success())
return job_info, success
return cloud_job
else:
# TODO: Add "reason" to exception message.
logger.error(f"Data loading failed: {texts.error()}")
raise OperationFailed("Data loading failed")
message = f"Data loading failed: {cloud_job.message}"
logger.error(f"{message}{texts.error()}")
raise OperationFailed(message)

# When exiting so, it is expected that error logging has taken place appropriately.
except CroudException as ex:
Expand All @@ -137,7 +220,7 @@ class StandaloneCluster(ClusterBase):
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, source: InputOutputResource, target: TableAddress):
"""
Load data into a database table on a standalone CrateDB Server.
Expand All @@ -148,7 +231,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = resource.url
source_url = source.url
target_url = self.address.dburi
if source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy
Expand Down
Loading

0 comments on commit a900403

Please sign in to comment.