diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4935550..5fdaddb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,3 +17,11 @@ repos: hooks: - id: mypy additional_dependencies: [pytest==8.3.2, types-pyyaml==6.0.12.20240724] + - repo: local + hooks: + - id: generate-crds + name: Generate CRD manifests + entry: bash -c 'python -m prefect_operator generate-crds > crds.yaml' + language: system + types: [python] + pass_filenames: false diff --git a/Makefile b/Makefile index a7d80f0..55b8bbc 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ endif mv .bookkeeping/uv.next .bookkeeping/uv -.bookkeeping/development.txt: .bookkeeping/uv requirements.txt +.bookkeeping/development.txt: .bookkeeping/uv requirements.txt pyproject.toml mkdir -p .bookkeeping cat requirements.txt > .bookkeeping/development.txt.next diff --git a/crds.yaml b/crds.yaml index d04a784..f9ad2d1 100644 --- a/crds.yaml +++ b/crds.yaml @@ -4,63 +4,94 @@ metadata: name: prefectservers.prefect.io spec: group: prefect.io - scope: Namespaced names: + kind: PrefectServer plural: prefectservers singular: prefectserver - kind: PrefectServer + scope: Namespaced versions: - - name: v3 - served: true - storage: true - schema: - openAPIV3Schema: - type: object - properties: - spec: - type: object - properties: - version: - type: string - sqlite: - type: object + - name: v3 + schema: + openAPIV3Schema: + properties: + spec: + properties: + postgres: + default: null + properties: + database: + title: Database + type: string + host: + title: Host + type: string + passwordSecretKeyRef: + properties: + key: + title: Key + type: string + name: + title: Name + type: string + required: + - name + - key + title: SecretKeyReference + type: object + port: + title: Port + type: integer + user: + title: User + type: string + required: + - host + - port + - user + - passwordSecretKeyRef + - database + title: PrefectPostgresDatabase + type: object + settings: + default: [] + items: properties: - storageClassName: + name: + title: Name type: string - size: + value: + title: Value type: string required: - - storageClassName - - size - postgres: + - name + - value + title: PrefectSetting type: object - properties: - host: - type: string - port: - type: number - user: - type: string - passwordSecretKeyRef: - type: object - properties: - name: - type: string - key: - type: string - database: - type: string - settings: - type: array - items: - type: object - properties: - name: - type: string - value: - type: string - required: - - spec + title: Settings + type: array + sqlite: + default: null + properties: + size: + title: Size + type: string + storageClassName: + title: Storageclassname + type: string + required: + - storageClassName + - size + title: PrefectSqliteDatabase + type: object + version: + default: 3.0.0rc13 + title: Version + type: string + title: PrefectServer + type: object + type: object + served: true + storage: true --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -68,34 +99,43 @@ metadata: name: prefectworkpools.prefect.io spec: group: prefect.io - scope: Namespaced names: + kind: PrefectWorkPool plural: prefectworkpools singular: prefectworkpool - kind: PrefectWorkPool + scope: Namespaced versions: - - name: v3 - served: true - storage: true - schema: - openAPIV3Schema: - type: object - properties: - spec: - type: object - properties: - version: - type: string - server: - type: object - properties: - namespace: - type: string - name: - type: string - required: - - name - workers: - type: number - required: - - spec + - name: v3 + schema: + openAPIV3Schema: + properties: + spec: + properties: + server: + properties: + name: + title: Name + type: string + namespace: + default: '' + title: Namespace + type: string + required: + - name + title: PrefectServerReference + type: object + version: + default: 3.0.0rc13 + title: Version + type: string + workers: + default: 1 + title: Workers + type: integer + required: + - server + title: PrefectWorkPool + type: object + type: object + served: true + storage: true diff --git a/deploy-example b/deploy-example index fdfb243..8febf57 100755 --- a/deploy-example +++ b/deploy-example @@ -1,4 +1,5 @@ #!/bin/bash +python -m prefect_operator generate-crds > crds.yaml kubectl apply -f crds.yaml for f in examples/$1/*.yaml; do diff --git a/prefect_operator/__init__.py b/prefect_operator/__init__.py index 7e24f8d..24477f7 100644 --- a/prefect_operator/__init__.py +++ b/prefect_operator/__init__.py @@ -1,569 +1,3 @@ -import time -from contextlib import contextmanager -from typing import Any, Generator, Optional, Self - -import httpx -import kopf -import kubernetes -from pydantic import BaseModel, Field, PrivateAttr, ValidationInfo, model_validator +from prefect_operator._version import __version__ # noqa: F401 DEFAULT_PREFECT_VERSION = "3.0.0rc13" - - -class NamedResource(BaseModel): - _name: str = PrivateAttr() - - @property - def name(self) -> str: - return self._name - - _namespace: str = PrivateAttr() - - @property - def namespace(self) -> str: - return self._namespace - - @model_validator(mode="after") - def set_name_and_namespace(self, validation_info: ValidationInfo) -> Self: - self._name = validation_info.context["name"] - self._namespace = validation_info.context["namespace"] - return self - - -class PrefectSqliteDatabase(BaseModel): - storageClassName: str - size: str - - @property - def is_file_based(self) -> bool: - return True - - def configure_prefect_server( - self, - prefect_server_workload_spec: dict[str, Any], - prefect_server_container: dict[str, Any], - ) -> None: - prefect_server_container["env"].extend( - [ - { - "name": "PREFECT_API_DATABASE_MIGRATE_ON_START", - "value": "true", - }, - { - "name": "PREFECT_API_DATABASE_CONNECTION_URL", - "value": "sqlite+aiosqlite:////var/lib/prefect/prefect.db", - }, - ] - ) - prefect_server_container["volumeMounts"] = [ - { - "name": "database", - "mountPath": "/var/lib/prefect/", - } - ] - prefect_server_workload_spec["volumeClaimTemplates"] = [ - { - "metadata": {"name": "database"}, - "spec": { - "accessModes": ["ReadWriteOnce"], - "storageClassName": self.storageClassName, - "resources": {"requests": {"storage": self.size}}, - }, - } - ] - - def desired_database_migration_job( - self, server: "PrefectServer" - ) -> dict[str, Any] | None: - return None - - -class SecretKeyReference(BaseModel): - name: str - key: str - - -class PrefectPostgresDatabase(BaseModel): - host: str - port: int - user: str - passwordSecretKeyRef: SecretKeyReference - database: str - - @property - def is_file_based(self) -> bool: - return False - - def configure_prefect_server( - self, - prefect_server_workload_spec: dict[str, Any], - prefect_server_container: dict[str, Any], - ) -> None: - prefect_server_container["env"].extend( - [ - { - "name": "PREFECT_API_DATABASE_CONNECTION_URL", - "value": ( - "postgresql+asyncpg://" - f"{ self.user }:${{PREFECT_API_DATABASE_PASSWORD}}" - "@" - f"{ self.host }:{ self.port }" - "/" - f"{self.database}" - ), - }, - { - "name": "PREFECT_API_DATABASE_PASSWORD", - "valueFrom": { - "secretKeyRef": { - "name": self.passwordSecretKeyRef.name, - "key": self.passwordSecretKeyRef.key, - } - }, - }, - { - "name": "PREFECT_API_DATABASE_MIGRATE_ON_START", - "value": "false", - }, - ] - ) - - def desired_database_migration_job(self, server: "PrefectServer") -> dict[str, Any]: - return { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": { - "namespace": server.namespace, - "name": f"{server.name}-migrate", - }, - "spec": { - "template": { - "metadata": {"labels": {"app": server.name}}, - "spec": { - "containers": [ - { - "name": "migrate", - "image": f"prefecthq/prefect:{server.version}-python3.12", - "env": [ - s.as_environment_variable() for s in server.settings - ], - "command": [ - "prefect", - "server", - "database", - "upgrade", - "--yes", - ], - } - ], - "restartPolicy": "OnFailure", - }, - }, - }, - } - - -class PrefectSetting(BaseModel): - name: str - value: str - - def as_environment_variable(self) -> dict[str, str]: - return {"name": self.name, "value": self.value} - - -class PrefectServer(NamedResource): - version: str = Field(DEFAULT_PREFECT_VERSION) - sqlite: Optional[PrefectSqliteDatabase] = Field(None) - postgres: Optional[PrefectPostgresDatabase] = Field(None) - settings: list[PrefectSetting] = Field([]) - - def desired_deployment(self) -> dict[str, Any]: - container_template = { - "name": "prefect-server", - "image": f"prefecthq/prefect:{self.version}-python3.12", - "env": [ - { - "name": "PREFECT_HOME", - "value": "/var/lib/prefect/", - }, - *[s.as_environment_variable() for s in self.settings], - ], - "command": ["prefect", "server", "start", "--host", "0.0.0.0"], - "ports": [{"containerPort": 4200}], - "readinessProbe": { - "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, - "initialDelaySeconds": 10, - "periodSeconds": 5, - "timeoutSeconds": 5, - "successThreshold": 1, - "failureThreshold": 30, - }, - "livenessProbe": { - "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, - "initialDelaySeconds": 120, - "periodSeconds": 10, - "timeoutSeconds": 5, - "successThreshold": 1, - "failureThreshold": 2, - }, - } - - pod_template: dict[str, Any] = { - "metadata": {"labels": {"app": self.name}}, - "spec": { - "containers": [container_template], - }, - } - - deployment_spec = { - "replicas": 1, - "selector": {"matchLabels": {"app": self.name}}, - "template": pod_template, - } - - database = self.postgres or self.sqlite - if not database: - raise NotImplementedError("No database defined") - - database.configure_prefect_server(deployment_spec, container_template) - - return { - "apiVersion": "apps/v1", - "kind": "Deployment", - "metadata": {"namespace": self.namespace, "name": self.name}, - "spec": deployment_spec, - } - - def desired_stateful_set(self) -> dict[str, Any]: - container_template = { - "name": "prefect-server", - "image": f"prefecthq/prefect:{self.version}-python3.12", - "env": [ - { - "name": "PREFECT_HOME", - "value": "/var/lib/prefect/", - }, - *[s.as_environment_variable() for s in self.settings], - ], - "command": ["prefect", "server", "start", "--host", "0.0.0.0"], - "ports": [{"containerPort": 4200}], - "readinessProbe": { - "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, - "initialDelaySeconds": 10, - "periodSeconds": 5, - "timeoutSeconds": 5, - "successThreshold": 1, - "failureThreshold": 30, - }, - "livenessProbe": { - "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, - "initialDelaySeconds": 120, - "periodSeconds": 10, - "timeoutSeconds": 5, - "successThreshold": 1, - "failureThreshold": 2, - }, - } - - pod_template: dict[str, Any] = { - "metadata": {"labels": {"app": self.name}}, - "spec": { - "containers": [container_template], - }, - } - - stateful_set_spec = { - "replicas": 1, - "selector": {"matchLabels": {"app": self.name}}, - "template": pod_template, - } - - database = self.postgres or self.sqlite - if not database: - raise NotImplementedError("No database defined") - - database.configure_prefect_server(stateful_set_spec, container_template) - - return { - "apiVersion": "apps/v1", - "kind": "StatefulSet", - "metadata": {"namespace": self.namespace, "name": self.name}, - "spec": stateful_set_spec, - } - - def desired_service(self) -> dict[str, Any]: - return { - "apiVersion": "v1", - "kind": "Service", - "metadata": {"namespace": self.namespace, "name": self.name}, - "spec": { - "selector": {"app": self.name}, - "ports": [{"port": 4200, "protocol": "TCP"}], - }, - } - - -@kopf.on.resume("prefect.io", "v3", "prefectserver") -@kopf.on.create("prefect.io", "v3", "prefectserver") -@kopf.on.update("prefect.io", "v3", "prefectserver") -def reconcile_server( - namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ -): - server = PrefectServer.model_validate( - spec, context={"name": name, "namespace": namespace} - ) - print(repr(server)) - - database = server.postgres or server.sqlite - if database: - api = kubernetes.client.BatchV1Api() - desired_database_migration = database.desired_database_migration_job(server) - if desired_database_migration: - try: - api.delete_namespaced_job( - name=desired_database_migration["metadata"]["name"], - namespace=namespace, - ) - except kubernetes.client.ApiException as e: - if e.status not in (404, 409): - raise - - while True: - try: - api.create_namespaced_job( - server.namespace, desired_database_migration - ) - break - except kubernetes.client.ApiException as e: - if e.status == 409: - time.sleep(1) - continue - raise - - api = kubernetes.client.AppsV1Api() - - if database.is_file_based: - desired_stateful_set = server.desired_stateful_set() - - try: - api.create_namespaced_stateful_set( - server.namespace, - desired_stateful_set, - ) - logger.info("Created stateful set %s", name) - except kubernetes.client.ApiException as e: - if e.status != 409: - raise - - api.replace_namespaced_stateful_set( - desired_stateful_set["metadata"]["name"], - server.namespace, - desired_stateful_set, - ) - logger.info("Updated stateful set %s", name) - else: - desired_deployment = server.desired_deployment() - - try: - api.create_namespaced_deployment( - server.namespace, - desired_deployment, - ) - logger.info("Created deployment %s", name) - except kubernetes.client.ApiException as e: - if e.status != 409: - raise - - api.replace_namespaced_deployment( - desired_deployment["metadata"]["name"], - server.namespace, - desired_deployment, - ) - logger.info("Updated deployment %s", name) - - desired_service = server.desired_service() - api = kubernetes.client.CoreV1Api() - try: - api.create_namespaced_service( - server.namespace, - desired_service, - ) - logger.info("Created service %s", name) - except kubernetes.client.ApiException as e: - if e.status != 409: - raise - - api.replace_namespaced_service( - desired_service["metadata"]["name"], - server.namespace, - desired_service, - ) - logger.info("Updated service %s", name) - - -@kopf.on.delete("prefect.io", "v3", "prefectserver") -def delete_server( - namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ -): - server = PrefectServer.model_validate( - spec, context={"name": name, "namespace": namespace} - ) - print(repr(server)) - - api = kubernetes.client.BatchV1Api() - try: - api.delete_namespaced_job( - name=f"{server.name}-migrate", - namespace=namespace, - ) - except kubernetes.client.ApiException as e: - if e.status not in (404, 409): - raise - - api = kubernetes.client.AppsV1Api() - try: - api.delete_namespaced_stateful_set(name, namespace) - logger.info("Deleted stateful set %s", name) - except kubernetes.client.ApiException as e: - if e.status == 404: - logger.info("Stateful set %s not found", name) - else: - raise - - try: - api.delete_namespaced_deployment(name, namespace) - logger.info("Deleted deployment %s", name) - except kubernetes.client.ApiException as e: - if e.status == 404: - logger.info("Deployment %s not found", name) - else: - raise - - api = kubernetes.client.CoreV1Api() - try: - api.delete_namespaced_service(name, namespace) - logger.info("Deleted service %s", name) - except kubernetes.client.ApiException as e: - if e.status == 404: - logger.info("Service %s not found", name) - else: - raise - - -class PrefectServerReference(BaseModel): - namespace: str = Field("") - name: str - - @property - def as_environment_variable(self) -> dict[str, Any]: - return {"name": "PREFECT_API_URL", "value": self.in_cluster_api_url} - - @property - def in_cluster_api_url(self) -> str: - return f"http://{self.name}.{self.namespace}.svc:4200/api" - - @contextmanager - def client(self) -> Generator[httpx.Client, None, None]: - with httpx.Client(base_url=self.in_cluster_api_url) as c: - yield c - - -class PrefectWorkPool(NamedResource): - # TODO: can we get the version from the server version at runtime? - version: str = Field(DEFAULT_PREFECT_VERSION) - server: PrefectServerReference - workers: int = Field(1) - - @property - def work_pool_name(self) -> str: - return f"{self.namespace}:{self.name}" - - def desired_deployment(self) -> dict[str, Any]: - container_template = { - "name": "prefect-worker", - "image": f"prefecthq/prefect:{self.version}-python3.12-kubernetes", - "env": [ - self.server.as_environment_variable, - ], - "command": [ - "bash", - "-c", - ( - "prefect worker start --type kubernetes " - f"--pool '{ self.work_pool_name }' " - f'--name "{ self.namespace }:${{HOSTNAME}}"' - ), - ], - } - - pod_template: dict[str, Any] = { - "metadata": {"labels": {"app": self.name}}, - "spec": { - "containers": [container_template], - }, - } - - deployment_spec = { - "replicas": self.workers, - "selector": {"matchLabels": {"app": self.name}}, - "template": pod_template, - } - - return { - "apiVersion": "apps/v1", - "kind": "Deployment", - "metadata": {"namespace": self.namespace, "name": self.name}, - "spec": deployment_spec, - } - - -@kopf.on.resume("prefect.io", "v3", "prefectworkpool") -@kopf.on.create("prefect.io", "v3", "prefectworkpool") -@kopf.on.update("prefect.io", "v3", "prefectworkpool") -def reconcile_work_pool( - namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ -): - work_pool = PrefectWorkPool.model_validate( - spec, context={"name": name, "namespace": namespace} - ) - print(repr(work_pool)) - - api = kubernetes.client.AppsV1Api() - desired_deployment = work_pool.desired_deployment() - - try: - api.create_namespaced_deployment( - work_pool.namespace, - desired_deployment, - ) - logger.info("Created deployment %s", name) - except kubernetes.client.ApiException as e: - if e.status != 409: - raise - - api.replace_namespaced_deployment( - desired_deployment["metadata"]["name"], - work_pool.namespace, - desired_deployment, - ) - logger.info("Updated deployment %s", name) - - -@kopf.on.delete("prefect.io", "v3", "prefectworkpool") -def delete_work_pool( - namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ -): - work_pool = PrefectWorkPool.model_validate( - spec, context={"name": name, "namespace": namespace} - ) - print(repr(work_pool)) - - api = kubernetes.client.AppsV1Api() - try: - api.delete_namespaced_deployment(name, namespace) - logger.info("Deleted deployment %s", name) - except kubernetes.client.ApiException as e: - if e.status == 404: - logger.info("deployment %s not found", name) - else: - raise diff --git a/prefect_operator/__main__.py b/prefect_operator/__main__.py new file mode 100644 index 0000000..debb079 --- /dev/null +++ b/prefect_operator/__main__.py @@ -0,0 +1,57 @@ +import argparse +import importlib +import os +import sys + +import yaml + +from prefect_operator import __version__ +from prefect_operator.resources import CustomResource + +parser = argparse.ArgumentParser(description="Prefect Operator") + +subparsers = parser.add_subparsers(dest="command", help="Available commands") + +parser.add_argument("--version", action="version", version=__version__) +subparsers.add_parser("version", help="Print the version") + +run_parser = subparsers.add_parser("run", help="Run prefect-operator") + +generate_crds_parser = subparsers.add_parser( + "generate-crds", + help="Generate the Custom Resource Definitions", +) + +args = parser.parse_args() + +modules = [ + "prefect_operator.server", + "prefect_operator.work_pool", +] + + +def main(): + match args.command: + case "run": + os.execvp( + "kopf", + [ + "kopf", + "run", + "--all-namespaces", + ] + + [(f"--module={module}") for module in modules], + ) + case "generate-crds": + for module in modules: + importlib.import_module(module) + + yaml.dump_all(CustomResource.definitions(), stream=sys.stdout) + case "version": + print(__version__) + case _: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/prefect_operator/_version.py b/prefect_operator/_version.py new file mode 100644 index 0000000..f79f470 --- /dev/null +++ b/prefect_operator/_version.py @@ -0,0 +1,17 @@ +# file generated by setuptools_scm +# don't change, don't track in version control +TYPE_CHECKING = False +if TYPE_CHECKING: + from typing import Tuple, Union + + VERSION_TUPLE = Tuple[Union[int, str], ...] +else: + VERSION_TUPLE = object + +version: str +__version__: str +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE + +__version__ = version = "0.1.dev8+gdaee741.d20240801" +__version_tuple__ = version_tuple = (0, 1, "dev8", "gdaee741.d20240801") diff --git a/prefect_operator/resources.py b/prefect_operator/resources.py new file mode 100644 index 0000000..13ae8c2 --- /dev/null +++ b/prefect_operator/resources.py @@ -0,0 +1,116 @@ +from typing import Any, ClassVar, Iterable, Self + +from pydantic import BaseModel, PrivateAttr, ValidationInfo, model_validator + + +class CustomResource(BaseModel): + kind: ClassVar[str] + plural: ClassVar[str] + singular: ClassVar[str] + + @classmethod + def concrete_resources(cls) -> Iterable[type["CustomResource"]]: + if hasattr(cls, "kind"): + yield cls + for subclass in cls.__subclasses__(): + yield from subclass.concrete_resources() + + @classmethod + def definitions(cls) -> Iterable[dict[str, Any]]: + return [resource.definition() for resource in cls.concrete_resources()] + + @classmethod + def definition(cls) -> dict[str, Any]: + return { + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": {"name": f"{cls.plural}.prefect.io"}, + "spec": { + "group": "prefect.io", + "scope": "Namespaced", + "names": { + "kind": cls.kind, + "plural": cls.plural, + "singular": cls.singular, + }, + "versions": [ + { + "name": "v3", + "served": True, + "storage": True, + "schema": { + "openAPIV3Schema": { + "type": "object", + "properties": { + "spec": cls.model_json_schema_inlined(), + }, + } + }, + } + ], + }, + } + + @classmethod + def model_json_schema_inlined(cls) -> dict[str, Any]: + schema = cls.model_json_schema() + definitions = schema.pop("$defs") or {} + + def resolve_refs(obj: Any): + if isinstance(obj, dict): + if "$ref" in obj: + ref = obj["$ref"] + if isinstance(ref, str) and ref.startswith("#/$defs/"): + del obj["$ref"] + obj.update(definitions[ref.split("/")[-1]]) + + for v in obj.values(): + resolve_refs(v) + + if isinstance(obj, list): + for v in obj: + resolve_refs(v) + + def collapse_optionals(obj: Any): + if isinstance(obj, dict): + if ( + "anyOf" in obj + and len(obj["anyOf"]) == 2 + and obj["anyOf"][0]["type"] == "object" + and obj["anyOf"][1]["type"] == "null" + ): + any_of = obj.pop("anyOf") + obj.update(any_of[0]) + + for v in obj.values(): + collapse_optionals(v) + + if isinstance(obj, list): + for v in obj: + collapse_optionals(v) + + resolve_refs(definitions) + resolve_refs(schema) + collapse_optionals(schema) + + return schema + + +class NamedResource(BaseModel): + _name: str = PrivateAttr() + + @property + def name(self) -> str: + return self._name + + _namespace: str = PrivateAttr() + + @property + def namespace(self) -> str: + return self._namespace + + @model_validator(mode="after") + def set_name_and_namespace(self, validation_info: ValidationInfo) -> Self: + self._name = validation_info.context["name"] + self._namespace = validation_info.context["namespace"] + return self diff --git a/prefect_operator/server.py b/prefect_operator/server.py new file mode 100644 index 0000000..e9bbbf4 --- /dev/null +++ b/prefect_operator/server.py @@ -0,0 +1,433 @@ +import time +from typing import Any, ClassVar, Optional + +import kopf +import kubernetes +from pydantic import BaseModel, Field + +from . import DEFAULT_PREFECT_VERSION +from .resources import CustomResource, NamedResource + + +class PrefectSqliteDatabase(BaseModel): + storageClassName: str + size: str + + @property + def is_file_based(self) -> bool: + return True + + def configure_prefect_server( + self, + prefect_server_workload_spec: dict[str, Any], + prefect_server_container: dict[str, Any], + ) -> None: + prefect_server_container["env"].extend( + [ + { + "name": "PREFECT_API_DATABASE_MIGRATE_ON_START", + "value": "true", + }, + { + "name": "PREFECT_API_DATABASE_CONNECTION_URL", + "value": "sqlite+aiosqlite:////var/lib/prefect/prefect.db", + }, + ] + ) + prefect_server_container["volumeMounts"] = [ + { + "name": "database", + "mountPath": "/var/lib/prefect/", + } + ] + prefect_server_workload_spec["volumeClaimTemplates"] = [ + { + "metadata": {"name": "database"}, + "spec": { + "accessModes": ["ReadWriteOnce"], + "storageClassName": self.storageClassName, + "resources": {"requests": {"storage": self.size}}, + }, + } + ] + + def desired_database_migration_job( + self, server: "PrefectServer" + ) -> dict[str, Any] | None: + return None + + +class SecretKeyReference(BaseModel): + name: str + key: str + + +class PrefectPostgresDatabase(BaseModel): + host: str + port: int + user: str + passwordSecretKeyRef: SecretKeyReference + database: str + + @property + def is_file_based(self) -> bool: + return False + + def configure_prefect_server( + self, + prefect_server_workload_spec: dict[str, Any], + prefect_server_container: dict[str, Any], + ) -> None: + prefect_server_container["env"].extend( + [ + { + "name": "PREFECT_API_DATABASE_CONNECTION_URL", + "value": ( + "postgresql+asyncpg://" + f"{ self.user }:${{PREFECT_API_DATABASE_PASSWORD}}" + "@" + f"{ self.host }:{ self.port }" + "/" + f"{self.database}" + ), + }, + { + "name": "PREFECT_API_DATABASE_PASSWORD", + "valueFrom": { + "secretKeyRef": { + "name": self.passwordSecretKeyRef.name, + "key": self.passwordSecretKeyRef.key, + } + }, + }, + { + "name": "PREFECT_API_DATABASE_MIGRATE_ON_START", + "value": "false", + }, + ] + ) + + def desired_database_migration_job(self, server: "PrefectServer") -> dict[str, Any]: + return { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "namespace": server.namespace, + "name": f"{server.name}-migrate", + }, + "spec": { + "template": { + "metadata": {"labels": {"app": server.name}}, + "spec": { + "containers": [ + { + "name": "migrate", + "image": f"prefecthq/prefect:{server.version}-python3.12", + "env": [ + s.as_environment_variable() for s in server.settings + ], + "command": [ + "prefect", + "server", + "database", + "upgrade", + "--yes", + ], + } + ], + "restartPolicy": "OnFailure", + }, + }, + }, + } + + +class PrefectSetting(BaseModel): + name: str + value: str + + def as_environment_variable(self) -> dict[str, str]: + return {"name": self.name, "value": self.value} + + +class PrefectServer(CustomResource, NamedResource): + kind: ClassVar[str] = "PrefectServer" + plural: ClassVar[str] = "prefectservers" + singular: ClassVar[str] = "prefectserver" + + version: str = Field(DEFAULT_PREFECT_VERSION) + sqlite: Optional[PrefectSqliteDatabase] = Field(None) + postgres: Optional[PrefectPostgresDatabase] = Field(None) + settings: list[PrefectSetting] = Field([]) + + def desired_deployment(self) -> dict[str, Any]: + container_template = { + "name": "prefect-server", + "image": f"prefecthq/prefect:{self.version}-python3.12", + "env": [ + { + "name": "PREFECT_HOME", + "value": "/var/lib/prefect/", + }, + *[s.as_environment_variable() for s in self.settings], + ], + "command": ["prefect", "server", "start", "--host", "0.0.0.0"], + "ports": [{"containerPort": 4200}], + "readinessProbe": { + "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, + "initialDelaySeconds": 10, + "periodSeconds": 5, + "timeoutSeconds": 5, + "successThreshold": 1, + "failureThreshold": 30, + }, + "livenessProbe": { + "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, + "initialDelaySeconds": 120, + "periodSeconds": 10, + "timeoutSeconds": 5, + "successThreshold": 1, + "failureThreshold": 2, + }, + } + + pod_template: dict[str, Any] = { + "metadata": {"labels": {"app": self.name}}, + "spec": { + "containers": [container_template], + }, + } + + deployment_spec = { + "replicas": 1, + "selector": {"matchLabels": {"app": self.name}}, + "template": pod_template, + } + + database = self.postgres or self.sqlite + if not database: + raise NotImplementedError("No database defined") + + database.configure_prefect_server(deployment_spec, container_template) + + return { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"namespace": self.namespace, "name": self.name}, + "spec": deployment_spec, + } + + def desired_stateful_set(self) -> dict[str, Any]: + container_template = { + "name": "prefect-server", + "image": f"prefecthq/prefect:{self.version}-python3.12", + "env": [ + { + "name": "PREFECT_HOME", + "value": "/var/lib/prefect/", + }, + *[s.as_environment_variable() for s in self.settings], + ], + "command": ["prefect", "server", "start", "--host", "0.0.0.0"], + "ports": [{"containerPort": 4200}], + "readinessProbe": { + "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, + "initialDelaySeconds": 10, + "periodSeconds": 5, + "timeoutSeconds": 5, + "successThreshold": 1, + "failureThreshold": 30, + }, + "livenessProbe": { + "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, + "initialDelaySeconds": 120, + "periodSeconds": 10, + "timeoutSeconds": 5, + "successThreshold": 1, + "failureThreshold": 2, + }, + } + + pod_template: dict[str, Any] = { + "metadata": {"labels": {"app": self.name}}, + "spec": { + "containers": [container_template], + }, + } + + stateful_set_spec = { + "replicas": 1, + "selector": {"matchLabels": {"app": self.name}}, + "template": pod_template, + } + + database = self.postgres or self.sqlite + if not database: + raise NotImplementedError("No database defined") + + database.configure_prefect_server(stateful_set_spec, container_template) + + return { + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "metadata": {"namespace": self.namespace, "name": self.name}, + "spec": stateful_set_spec, + } + + def desired_service(self) -> dict[str, Any]: + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"namespace": self.namespace, "name": self.name}, + "spec": { + "selector": {"app": self.name}, + "ports": [{"port": 4200, "protocol": "TCP"}], + }, + } + + +@kopf.on.resume("prefect.io", "v3", "prefectserver") +@kopf.on.create("prefect.io", "v3", "prefectserver") +@kopf.on.update("prefect.io", "v3", "prefectserver") +def reconcile_server( + namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ +): + server = PrefectServer.model_validate( + spec, context={"name": name, "namespace": namespace} + ) + print(repr(server)) + + database = server.postgres or server.sqlite + if database: + api = kubernetes.client.BatchV1Api() + desired_database_migration = database.desired_database_migration_job(server) + if desired_database_migration: + try: + api.delete_namespaced_job( + name=desired_database_migration["metadata"]["name"], + namespace=namespace, + ) + except kubernetes.client.ApiException as e: + if e.status not in (404, 409): + raise + + while True: + try: + api.create_namespaced_job( + server.namespace, desired_database_migration + ) + break + except kubernetes.client.ApiException as e: + if e.status == 409: + time.sleep(1) + continue + raise + + api = kubernetes.client.AppsV1Api() + + if database.is_file_based: + desired_stateful_set = server.desired_stateful_set() + + try: + api.create_namespaced_stateful_set( + server.namespace, + desired_stateful_set, + ) + logger.info("Created stateful set %s", name) + except kubernetes.client.ApiException as e: + if e.status != 409: + raise + + api.replace_namespaced_stateful_set( + desired_stateful_set["metadata"]["name"], + server.namespace, + desired_stateful_set, + ) + logger.info("Updated stateful set %s", name) + else: + desired_deployment = server.desired_deployment() + + try: + api.create_namespaced_deployment( + server.namespace, + desired_deployment, + ) + logger.info("Created deployment %s", name) + except kubernetes.client.ApiException as e: + if e.status != 409: + raise + + api.replace_namespaced_deployment( + desired_deployment["metadata"]["name"], + server.namespace, + desired_deployment, + ) + logger.info("Updated deployment %s", name) + + desired_service = server.desired_service() + api = kubernetes.client.CoreV1Api() + try: + api.create_namespaced_service( + server.namespace, + desired_service, + ) + logger.info("Created service %s", name) + except kubernetes.client.ApiException as e: + if e.status != 409: + raise + + api.replace_namespaced_service( + desired_service["metadata"]["name"], + server.namespace, + desired_service, + ) + logger.info("Updated service %s", name) + + +@kopf.on.delete("prefect.io", "v3", "prefectserver") +def delete_server( + namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ +): + server = PrefectServer.model_validate( + spec, context={"name": name, "namespace": namespace} + ) + print(repr(server)) + + api = kubernetes.client.BatchV1Api() + try: + api.delete_namespaced_job( + name=f"{server.name}-migrate", + namespace=namespace, + ) + except kubernetes.client.ApiException as e: + if e.status not in (404, 409): + raise + + api = kubernetes.client.AppsV1Api() + try: + api.delete_namespaced_stateful_set(name, namespace) + logger.info("Deleted stateful set %s", name) + except kubernetes.client.ApiException as e: + if e.status == 404: + logger.info("Stateful set %s not found", name) + else: + raise + + try: + api.delete_namespaced_deployment(name, namespace) + logger.info("Deleted deployment %s", name) + except kubernetes.client.ApiException as e: + if e.status == 404: + logger.info("Deployment %s not found", name) + else: + raise + + api = kubernetes.client.CoreV1Api() + try: + api.delete_namespaced_service(name, namespace) + logger.info("Deleted service %s", name) + except kubernetes.client.ApiException as e: + if e.status == 404: + logger.info("Service %s not found", name) + else: + raise diff --git a/prefect_operator/work_pool.py b/prefect_operator/work_pool.py new file mode 100644 index 0000000..cbb0b74 --- /dev/null +++ b/prefect_operator/work_pool.py @@ -0,0 +1,133 @@ +from contextlib import contextmanager +from typing import Any, ClassVar, Generator + +import httpx +import kopf +import kubernetes +from pydantic import BaseModel, Field + +from . import DEFAULT_PREFECT_VERSION +from .resources import CustomResource, NamedResource + + +class PrefectServerReference(BaseModel): + namespace: str = Field("") + name: str + + @property + def as_environment_variable(self) -> dict[str, Any]: + return {"name": "PREFECT_API_URL", "value": self.in_cluster_api_url} + + @property + def in_cluster_api_url(self) -> str: + return f"http://{self.name}.{self.namespace}.svc:4200/api" + + @contextmanager + def client(self) -> Generator[httpx.Client, None, None]: + with httpx.Client(base_url=self.in_cluster_api_url) as c: + yield c + + +class PrefectWorkPool(CustomResource, NamedResource): + kind: ClassVar[str] = "PrefectWorkPool" + plural: ClassVar[str] = "prefectworkpools" + singular: ClassVar[str] = "prefectworkpool" + + # TODO: can we get the version from the server version at runtime? + version: str = Field(DEFAULT_PREFECT_VERSION) + server: PrefectServerReference + workers: int = Field(1) + + @property + def work_pool_name(self) -> str: + return f"{self.namespace}:{self.name}" + + def desired_deployment(self) -> dict[str, Any]: + container_template = { + "name": "prefect-worker", + "image": f"prefecthq/prefect:{self.version}-python3.12-kubernetes", + "env": [ + self.server.as_environment_variable, + ], + "command": [ + "bash", + "-c", + ( + "prefect worker start --type kubernetes " + f"--pool '{ self.work_pool_name }' " + f'--name "{ self.namespace }:${{HOSTNAME}}"' + ), + ], + } + + pod_template: dict[str, Any] = { + "metadata": {"labels": {"app": self.name}}, + "spec": { + "containers": [container_template], + }, + } + + deployment_spec = { + "replicas": self.workers, + "selector": {"matchLabels": {"app": self.name}}, + "template": pod_template, + } + + return { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"namespace": self.namespace, "name": self.name}, + "spec": deployment_spec, + } + + +@kopf.on.resume("prefect.io", "v3", "prefectworkpool") +@kopf.on.create("prefect.io", "v3", "prefectworkpool") +@kopf.on.update("prefect.io", "v3", "prefectworkpool") +def reconcile_work_pool( + namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ +): + work_pool = PrefectWorkPool.model_validate( + spec, context={"name": name, "namespace": namespace} + ) + print(repr(work_pool)) + + api = kubernetes.client.AppsV1Api() + desired_deployment = work_pool.desired_deployment() + + try: + api.create_namespaced_deployment( + work_pool.namespace, + desired_deployment, + ) + logger.info("Created deployment %s", name) + except kubernetes.client.ApiException as e: + if e.status != 409: + raise + + api.replace_namespaced_deployment( + desired_deployment["metadata"]["name"], + work_pool.namespace, + desired_deployment, + ) + logger.info("Updated deployment %s", name) + + +@kopf.on.delete("prefect.io", "v3", "prefectworkpool") +def delete_work_pool( + namespace: str, name: str, spec: dict[str, Any], logger: kopf.Logger, **_ +): + work_pool = PrefectWorkPool.model_validate( + spec, context={"name": name, "namespace": namespace} + ) + print(repr(work_pool)) + + api = kubernetes.client.AppsV1Api() + try: + api.delete_namespaced_deployment(name, namespace) + logger.info("Deleted deployment %s", name) + except kubernetes.client.ApiException as e: + if e.status == 404: + logger.info("deployment %s not found", name) + else: + raise diff --git a/pyproject.toml b/pyproject.toml index e69de29..392c476 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["setuptools>=64", "setuptools_scm>=8"] +build-backend = "setuptools.build_meta" + +[project] +name = "prefect-operator" +requires-python = ">=3.11" + +dynamic = ["version"] + +[tool.setuptools_scm] +version_file = "prefect_operator/_version.py" + +[project.scripts] +prefect-operator = "prefect_operator.__main__:main"