diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5fdaddb..d6b2242 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,6 +17,10 @@ repos: hooks: - id: mypy additional_dependencies: [pytest==8.3.2, types-pyyaml==6.0.12.20240724] + - repo: https://github.com/codespell-project/codespell + rev: v2.2.6 + hooks: + - id: codespell - repo: local hooks: - id: generate-crds diff --git a/Makefile b/Makefile index 65b4cb0..2847713 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ requirements-dev.txt: requirements.txt requirements-dev.in .bookkeeping/uv .PHONY: docker docker: Dockerfile .dockerignore requirements.txt - docker build -t prefect-operator:latest . + docker build -t PrefectHQ/prefect-operator:latest . .PHONY: install install: .bookkeeping/development.txt .git/hooks/pre-commit .pre-commit-config.yaml docker diff --git a/deploy-example b/deploy-example index 8febf57..f1172e3 100755 --- a/deploy-example +++ b/deploy-example @@ -1,7 +1,22 @@ #!/bin/bash +kubectl apply -f - << NAMESPACE +apiVersion: v1 +kind: Namespace +metadata: + name: prefect-operator +NAMESPACE + +make docker +kubectl -n prefect-operator apply -f operator.yaml +kubectl -n prefect-operator rollout restart deployment prefect-operator + python -m prefect_operator generate-crds > crds.yaml kubectl apply -f crds.yaml +if [ -z "$1" ]; then + exit 0 +fi + for f in examples/$1/*.yaml; do kubectl apply -f $f done diff --git a/operator.yaml b/operator.yaml new file mode 100644 index 0000000..5628130 --- /dev/null +++ b/operator.yaml @@ -0,0 +1,93 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: prefect-operator-role-cluster +rules: + + # Kopf framework: knowing which other operators are running (i.e. peering) + - apiGroups: [kopf.dev] + resources: [clusterkopfpeerings] + verbs: [list, watch, patch, get] + + # Kopf framework: runtime observation of namespaces & CRDs (addition/deletion) + - apiGroups: [apiextensions.k8s.io] + resources: [customresourcedefinitions] + verbs: [list, watch] + - apiGroups: [""] + resources: [namespaces] + verbs: [list, watch] + + # Kopf framework: admission webhook configuration management + - apiGroups: [admissionregistration.k8s.io/v1, admissionregistration.k8s.io/v1beta1] + resources: [validatingwebhookconfigurations, mutatingwebhookconfigurations] + verbs: [create, patch] + + # Kopf framework: events + - apiGroups: [""] + resources: [events] + verbs: [create] + + # Prefect operator: read-only access for watching prefect-operator CRDs cluster-wide + - apiGroups: [prefect.io] + resources: [prefectservers, prefectworkpools] + verbs: [list, get, watch, patch] + + # Prefect operator: write access to deployments, services, etc + - apiGroups: [""] + resources: [services, persistentvolumeclaims] + verbs: [list, get, create, update, patch, delete] + + - apiGroups: ["apps"] + resources: [deployments] + verbs: [list, get, create, update, patch, delete] + + - apiGroups: ["batch"] + resources: [jobs] + verbs: [list, get, create, update, patch, delete] + +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + namespace: prefect-operator + name: prefect-operator + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: prefect-operator-rolebinding-cluster +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: prefect-operator-role-cluster +subjects: + - kind: ServiceAccount + namespace: prefect-operator + name: prefect-operator + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: prefect-operator + name: prefect-operator +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + app: prefect-operator + template: + metadata: + labels: + app: prefect-operator + spec: + serviceAccountName: prefect-operator + containers: + - name: operator + image: PrefectHQ/prefect-operator:latest + imagePullPolicy: IfNotPresent diff --git a/src/prefect_operator/server.py b/src/prefect_operator/server.py index e9bbbf4..18d42e0 100644 --- a/src/prefect_operator/server.py +++ b/src/prefect_operator/server.py @@ -17,11 +17,32 @@ class PrefectSqliteDatabase(BaseModel): def is_file_based(self) -> bool: return True + def desired_persistent_volume_claim( + self, server: "PrefectServer" + ) -> dict[str, Any] | None: + return { + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": { + "namespace": server.namespace, + "name": f"{server.name}-database", + }, + "spec": { + "storageClassName": self.storageClassName, + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": self.size}}, + }, + } + def configure_prefect_server( self, + server: "PrefectServer", prefect_server_workload_spec: dict[str, Any], prefect_server_container: dict[str, Any], ) -> None: + prefect_server_workload_spec["replicas"] = 1 + prefect_server_workload_spec["strategy"] = {"type": "Recreate"} + prefect_server_container["env"].extend( [ { @@ -40,14 +61,10 @@ def configure_prefect_server( "mountPath": "/var/lib/prefect/", } ] - prefect_server_workload_spec["volumeClaimTemplates"] = [ + prefect_server_workload_spec["template"]["spec"]["volumes"] = [ { - "metadata": {"name": "database"}, - "spec": { - "accessModes": ["ReadWriteOnce"], - "storageClassName": self.storageClassName, - "resources": {"requests": {"storage": self.size}}, - }, + "name": "database", + "persistentVolumeClaim": {"claimName": f"{server.name}-database"}, } ] @@ -73,8 +90,14 @@ class PrefectPostgresDatabase(BaseModel): def is_file_based(self) -> bool: return False + def desired_persistent_volume_claim( + self, server: "PrefectServer" + ) -> dict[str, Any] | None: + return None + def configure_prefect_server( self, + server: "PrefectServer", prefect_server_workload_spec: dict[str, Any], prefect_server_container: dict[str, Any], ) -> None: @@ -208,7 +231,7 @@ def desired_deployment(self) -> dict[str, Any]: if not database: raise NotImplementedError("No database defined") - database.configure_prefect_server(deployment_spec, container_template) + database.configure_prefect_server(self, deployment_spec, container_template) return { "apiVersion": "apps/v1", @@ -217,63 +240,6 @@ def desired_deployment(self) -> dict[str, Any]: "spec": deployment_spec, } - def desired_stateful_set(self) -> dict[str, Any]: - container_template = { - "name": "prefect-server", - "image": f"prefecthq/prefect:{self.version}-python3.12", - "env": [ - { - "name": "PREFECT_HOME", - "value": "/var/lib/prefect/", - }, - *[s.as_environment_variable() for s in self.settings], - ], - "command": ["prefect", "server", "start", "--host", "0.0.0.0"], - "ports": [{"containerPort": 4200}], - "readinessProbe": { - "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, - "initialDelaySeconds": 10, - "periodSeconds": 5, - "timeoutSeconds": 5, - "successThreshold": 1, - "failureThreshold": 30, - }, - "livenessProbe": { - "httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"}, - "initialDelaySeconds": 120, - "periodSeconds": 10, - "timeoutSeconds": 5, - "successThreshold": 1, - "failureThreshold": 2, - }, - } - - pod_template: dict[str, Any] = { - "metadata": {"labels": {"app": self.name}}, - "spec": { - "containers": [container_template], - }, - } - - stateful_set_spec = { - "replicas": 1, - "selector": {"matchLabels": {"app": self.name}}, - "template": pod_template, - } - - database = self.postgres or self.sqlite - if not database: - raise NotImplementedError("No database defined") - - database.configure_prefect_server(stateful_set_spec, container_template) - - return { - "apiVersion": "apps/v1", - "kind": "StatefulSet", - "metadata": {"namespace": self.namespace, "name": self.name}, - "spec": stateful_set_spec, - } - def desired_service(self) -> dict[str, Any]: return { "apiVersion": "v1", @@ -323,46 +289,34 @@ def reconcile_server( continue raise - api = kubernetes.client.AppsV1Api() - - if database.is_file_based: - desired_stateful_set = server.desired_stateful_set() - + desired_persistent_volume_claim = database.desired_persistent_volume_claim(server) + if desired_persistent_volume_claim: + api = kubernetes.client.CoreV1Api() try: - api.create_namespaced_stateful_set( + api.create_namespaced_persistent_volume_claim( server.namespace, - desired_stateful_set, + desired_persistent_volume_claim, ) - logger.info("Created stateful set %s", name) + logger.info("Created persistent volume claim %s", name) except kubernetes.client.ApiException as e: if e.status != 409: raise - api.replace_namespaced_stateful_set( - desired_stateful_set["metadata"]["name"], - server.namespace, - desired_stateful_set, - ) - logger.info("Updated stateful set %s", name) - else: - desired_deployment = server.desired_deployment() - - try: - api.create_namespaced_deployment( - server.namespace, - desired_deployment, - ) - logger.info("Created deployment %s", name) - except kubernetes.client.ApiException as e: - if e.status != 409: - raise + api = kubernetes.client.AppsV1Api() + desired_deployment = server.desired_deployment() + try: + api.create_namespaced_deployment(server.namespace, desired_deployment) + logger.info("Created deployment %s", name) + except kubernetes.client.ApiException as e: + if e.status != 409: + raise - api.replace_namespaced_deployment( - desired_deployment["metadata"]["name"], - server.namespace, - desired_deployment, - ) - logger.info("Updated deployment %s", name) + api.replace_namespaced_deployment( + desired_deployment["metadata"]["name"], + server.namespace, + desired_deployment, + ) + logger.info("Updated deployment %s", name) desired_service = server.desired_service() api = kubernetes.client.CoreV1Api() @@ -404,15 +358,6 @@ def delete_server( raise api = kubernetes.client.AppsV1Api() - try: - api.delete_namespaced_stateful_set(name, namespace) - logger.info("Deleted stateful set %s", name) - except kubernetes.client.ApiException as e: - if e.status == 404: - logger.info("Stateful set %s not found", name) - else: - raise - try: api.delete_namespaced_deployment(name, namespace) logger.info("Deleted deployment %s", name) diff --git a/sync-pre-commit b/sync-pre-commit index 7794786..8c20bf3 100755 --- a/sync-pre-commit +++ b/sync-pre-commit @@ -67,7 +67,7 @@ def resolve_repo_versions(pinned: dict[str, str]) -> dict[str, str]: def resolve_mypy_dependencies(pinned: dict[str, str], mypy: set[str]) -> list[str]: - """Given the pinned development dependencies and the requsted mypy dependencies, + """Given the pinned development dependencies and the requested mypy dependencies, resolve them into pip version specifiers (like "mypackage>=1.2.3")""" resolved = {f"{dep}{pinned[dep]}" for dep in mypy if dep in pinned}