Skip to content

Commit

Permalink
Adds unit tests for the PrefectServer and PrefectWorkPool models (#19)
Browse files Browse the repository at this point in the history
These tests confirm validation rules and the generation of manifests, but not
the interaction with the Kubernetes API.

Part of #13
Part of #14
  • Loading branch information
chrisguidry authored Aug 2, 2024
1 parent 2f3fa17 commit 9f7dab8
Show file tree
Hide file tree
Showing 7 changed files with 452 additions and 59 deletions.
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mypy
pre-commit
pytest
pytest-coverage
pytest-xdist
ruff
types-pyyaml
yamlfix
Expand Down
5 changes: 5 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ distlib==0.3.8
# via virtualenv
distro==1.9.0
# via ruyaml
execnet==2.1.1
# via pytest-xdist
executing==2.0.1
# via stack-data
filelock==3.15.4
Expand Down Expand Up @@ -164,12 +166,15 @@ pytest==8.3.2
# via
# -r requirements-dev.in
# pytest-cov
# pytest-xdist
pytest-cov==5.0.0
# via pytest-cover
pytest-cover==3.0.0
# via pytest-coverage
pytest-coverage==0.0
# via -r requirements-dev.in
pytest-xdist==3.6.1
# via -r requirements-dev.in
python-dateutil==2.9.0.post0
# via
# -r requirements.txt
Expand Down
44 changes: 26 additions & 18 deletions src/prefect_operator/resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, ClassVar, Iterable, Self
from typing import Any, ClassVar, Iterable

from pydantic import BaseModel, PrivateAttr, ValidationInfo, model_validator
from pydantic import BaseModel, ValidationInfo, model_validator


class CustomResource(BaseModel):
Expand Down Expand Up @@ -96,21 +96,29 @@ def collapse_optionals(obj: Any):
return schema


class NamedResource(BaseModel):
_name: str = PrivateAttr()
class NamedResource(CustomResource):
name: str
namespace: str

@property
def name(self) -> str:
return self._name

_namespace: str = PrivateAttr()

@property
def namespace(self) -> str:
return self._namespace
@model_validator(mode="before")
@classmethod
def set_name_and_namespace(
cls, values: dict[str, Any], validation_info: ValidationInfo
) -> dict[str, Any]:
if validation_info.context:
values = dict(values)
values.setdefault("name", validation_info.context.get("name"))
values.setdefault("namespace", validation_info.context.get("namespace"))
return values

@model_validator(mode="after")
def set_name_and_namespace(self, validation_info: ValidationInfo) -> Self:
self._name = validation_info.context["name"]
self._namespace = validation_info.context["namespace"]
return self
@classmethod
def model_json_schema_inlined(cls) -> dict[str, Any]:
schema = super().model_json_schema_inlined()
# The name and namespace attributes aren't actually part of the spec
schema["properties"].pop("name", None)
schema["properties"].pop("namespace", None)
schema["required"].remove("name")
schema["required"].remove("namespace")
if not schema["required"]:
schema.pop("required")
return schema
72 changes: 33 additions & 39 deletions src/prefect_operator/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@
from pydantic import BaseModel, Field

from . import DEFAULT_PREFECT_VERSION
from .resources import CustomResource, NamedResource
from .resources import NamedResource


class PrefectSqliteDatabase(BaseModel):
storageClassName: str
size: str

@property
def is_file_based(self) -> bool:
return True

def desired_persistent_volume_claim(
self, server: "PrefectServer"
) -> dict[str, Any] | None:
Expand All @@ -34,7 +30,7 @@ def desired_persistent_volume_claim(
},
}

def configure_prefect_server(
def configure_prefect_server_workload(
self,
server: "PrefectServer",
prefect_server_workload_spec: dict[str, Any],
Expand All @@ -47,7 +43,7 @@ def configure_prefect_server(
[
{
"name": "PREFECT_API_DATABASE_MIGRATE_ON_START",
"value": "true",
"value": "True",
},
{
"name": "PREFECT_API_DATABASE_CONNECTION_URL",
Expand Down Expand Up @@ -86,16 +82,12 @@ class PrefectPostgresDatabase(BaseModel):
passwordSecretKeyRef: SecretKeyReference
database: str

@property
def is_file_based(self) -> bool:
return False

def desired_persistent_volume_claim(
self, server: "PrefectServer"
) -> dict[str, Any] | None:
return None

def configure_prefect_server(
def configure_prefect_server_workload(
self,
server: "PrefectServer",
prefect_server_workload_spec: dict[str, Any],
Expand Down Expand Up @@ -125,43 +117,43 @@ def configure_prefect_server(
},
{
"name": "PREFECT_API_DATABASE_MIGRATE_ON_START",
"value": "false",
"value": "False",
},
]
)

def desired_database_migration_job(self, server: "PrefectServer") -> dict[str, Any]:
migration_container = {
"name": "migrate",
"image": f"prefecthq/prefect:{server.version}-python3.12",
"env": [s.as_environment_variable() for s in server.settings],
"command": [
"prefect",
"server",
"database",
"upgrade",
"--yes",
],
}
job_spec = {
"template": {
"spec": {
"containers": [migration_container],
"restartPolicy": "OnFailure",
},
},
}

self.configure_prefect_server_workload(server, job_spec, migration_container)

return {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"namespace": server.namespace,
"name": f"{server.name}-migrate",
},
"spec": {
"template": {
"metadata": {"labels": {"app": server.name}},
"spec": {
"containers": [
{
"name": "migrate",
"image": f"prefecthq/prefect:{server.version}-python3.12",
"env": [
s.as_environment_variable() for s in server.settings
],
"command": [
"prefect",
"server",
"database",
"upgrade",
"--yes",
],
}
],
"restartPolicy": "OnFailure",
},
},
},
"spec": job_spec,
}


Expand All @@ -173,7 +165,7 @@ def as_environment_variable(self) -> dict[str, str]:
return {"name": self.name, "value": self.value}


class PrefectServer(CustomResource, NamedResource):
class PrefectServer(NamedResource):
kind: ClassVar[str] = "PrefectServer"
plural: ClassVar[str] = "prefectservers"
singular: ClassVar[str] = "prefectserver"
Expand Down Expand Up @@ -231,7 +223,9 @@ def desired_deployment(self) -> dict[str, Any]:
if not database:
raise NotImplementedError("No database defined")

database.configure_prefect_server(self, deployment_spec, container_template)
database.configure_prefect_server_workload(
self, deployment_spec, container_template
)

return {
"apiVersion": "apps/v1",
Expand Down
4 changes: 2 additions & 2 deletions src/prefect_operator/work_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import BaseModel, Field

from . import DEFAULT_PREFECT_VERSION
from .resources import CustomResource, NamedResource
from .resources import NamedResource


class PrefectServerReference(BaseModel):
Expand All @@ -28,7 +28,7 @@ def client(self) -> Generator[httpx.Client, None, None]:
yield c


class PrefectWorkPool(CustomResource, NamedResource):
class PrefectWorkPool(NamedResource):
kind: ClassVar[str] = "PrefectWorkPool"
plural: ClassVar[str] = "prefectworkpools"
singular: ClassVar[str] = "prefectworkpool"
Expand Down
Loading

0 comments on commit 9f7dab8

Please sign in to comment.