Skip to content

Commit

Permalink
Adds an operator manifest example for running on Kubernetes (#17)
Browse files Browse the repository at this point in the history
* Adds an operator manifest example for running on Kubernetes

This new operator.yaml will become the template for a versioned release asset
when we start cutting releases.  That way we can include the fully-specified
`PrefectHQ/prefect-operator` version tag in a place that people can permanently
reference.

This change also changes the SQLite version of the server from a `StatefulSet`
to a `Deployment` with an explicit PVC, fixed replicas, and the `Recreate`
rollout strategy.  This makes it easier to reason about what the server
deployment will look like and simplifies the code.

Closes #8

* Typo
  • Loading branch information
chrisguidry authored Aug 2, 2024
1 parent f0ae62b commit 2f3fa17
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 108 deletions.
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ repos:
hooks:
- id: mypy
additional_dependencies: [pytest==8.3.2, types-pyyaml==6.0.12.20240724]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
hooks:
- id: codespell
- repo: local
hooks:
- id: generate-crds
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ requirements-dev.txt: requirements.txt requirements-dev.in .bookkeeping/uv

.PHONY: docker
docker: Dockerfile .dockerignore requirements.txt
docker build -t prefect-operator:latest .
docker build -t PrefectHQ/prefect-operator:latest .

.PHONY: install
install: .bookkeeping/development.txt .git/hooks/pre-commit .pre-commit-config.yaml docker
Expand Down
15 changes: 15 additions & 0 deletions deploy-example
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
#!/bin/bash
kubectl apply -f - << NAMESPACE
apiVersion: v1
kind: Namespace
metadata:
name: prefect-operator
NAMESPACE

make docker
kubectl -n prefect-operator apply -f operator.yaml
kubectl -n prefect-operator rollout restart deployment prefect-operator

python -m prefect_operator generate-crds > crds.yaml
kubectl apply -f crds.yaml

if [ -z "$1" ]; then
exit 0
fi

for f in examples/$1/*.yaml; do
kubectl apply -f $f
done
93 changes: 93 additions & 0 deletions operator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: prefect-operator-role-cluster
rules:

# Kopf framework: knowing which other operators are running (i.e. peering)
- apiGroups: [kopf.dev]
resources: [clusterkopfpeerings]
verbs: [list, watch, patch, get]

# Kopf framework: runtime observation of namespaces & CRDs (addition/deletion)
- apiGroups: [apiextensions.k8s.io]
resources: [customresourcedefinitions]
verbs: [list, watch]
- apiGroups: [""]
resources: [namespaces]
verbs: [list, watch]

# Kopf framework: admission webhook configuration management
- apiGroups: [admissionregistration.k8s.io/v1, admissionregistration.k8s.io/v1beta1]
resources: [validatingwebhookconfigurations, mutatingwebhookconfigurations]
verbs: [create, patch]

# Kopf framework: events
- apiGroups: [""]
resources: [events]
verbs: [create]

# Prefect operator: read-only access for watching prefect-operator CRDs cluster-wide
- apiGroups: [prefect.io]
resources: [prefectservers, prefectworkpools]
verbs: [list, get, watch, patch]

# Prefect operator: write access to deployments, services, etc
- apiGroups: [""]
resources: [services, persistentvolumeclaims]
verbs: [list, get, create, update, patch, delete]

- apiGroups: ["apps"]
resources: [deployments]
verbs: [list, get, create, update, patch, delete]

- apiGroups: ["batch"]
resources: [jobs]
verbs: [list, get, create, update, patch, delete]

---

apiVersion: v1
kind: ServiceAccount
metadata:
namespace: prefect-operator
name: prefect-operator

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: prefect-operator-rolebinding-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: prefect-operator-role-cluster
subjects:
- kind: ServiceAccount
namespace: prefect-operator
name: prefect-operator

---

apiVersion: apps/v1
kind: Deployment
metadata:
namespace: prefect-operator
name: prefect-operator
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: prefect-operator
template:
metadata:
labels:
app: prefect-operator
spec:
serviceAccountName: prefect-operator
containers:
- name: operator
image: PrefectHQ/prefect-operator:latest
imagePullPolicy: IfNotPresent
157 changes: 51 additions & 106 deletions src/prefect_operator/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,32 @@ class PrefectSqliteDatabase(BaseModel):
def is_file_based(self) -> bool:
return True

def desired_persistent_volume_claim(
self, server: "PrefectServer"
) -> dict[str, Any] | None:
return {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"namespace": server.namespace,
"name": f"{server.name}-database",
},
"spec": {
"storageClassName": self.storageClassName,
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": self.size}},
},
}

def configure_prefect_server(
self,
server: "PrefectServer",
prefect_server_workload_spec: dict[str, Any],
prefect_server_container: dict[str, Any],
) -> None:
prefect_server_workload_spec["replicas"] = 1
prefect_server_workload_spec["strategy"] = {"type": "Recreate"}

prefect_server_container["env"].extend(
[
{
Expand All @@ -40,14 +61,10 @@ def configure_prefect_server(
"mountPath": "/var/lib/prefect/",
}
]
prefect_server_workload_spec["volumeClaimTemplates"] = [
prefect_server_workload_spec["template"]["spec"]["volumes"] = [
{
"metadata": {"name": "database"},
"spec": {
"accessModes": ["ReadWriteOnce"],
"storageClassName": self.storageClassName,
"resources": {"requests": {"storage": self.size}},
},
"name": "database",
"persistentVolumeClaim": {"claimName": f"{server.name}-database"},
}
]

Expand All @@ -73,8 +90,14 @@ class PrefectPostgresDatabase(BaseModel):
def is_file_based(self) -> bool:
return False

def desired_persistent_volume_claim(
self, server: "PrefectServer"
) -> dict[str, Any] | None:
return None

def configure_prefect_server(
self,
server: "PrefectServer",
prefect_server_workload_spec: dict[str, Any],
prefect_server_container: dict[str, Any],
) -> None:
Expand Down Expand Up @@ -208,7 +231,7 @@ def desired_deployment(self) -> dict[str, Any]:
if not database:
raise NotImplementedError("No database defined")

database.configure_prefect_server(deployment_spec, container_template)
database.configure_prefect_server(self, deployment_spec, container_template)

return {
"apiVersion": "apps/v1",
Expand All @@ -217,63 +240,6 @@ def desired_deployment(self) -> dict[str, Any]:
"spec": deployment_spec,
}

def desired_stateful_set(self) -> dict[str, Any]:
container_template = {
"name": "prefect-server",
"image": f"prefecthq/prefect:{self.version}-python3.12",
"env": [
{
"name": "PREFECT_HOME",
"value": "/var/lib/prefect/",
},
*[s.as_environment_variable() for s in self.settings],
],
"command": ["prefect", "server", "start", "--host", "0.0.0.0"],
"ports": [{"containerPort": 4200}],
"readinessProbe": {
"httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"},
"initialDelaySeconds": 10,
"periodSeconds": 5,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 30,
},
"livenessProbe": {
"httpGet": {"path": "/api/health", "port": 4200, "scheme": "HTTP"},
"initialDelaySeconds": 120,
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 2,
},
}

pod_template: dict[str, Any] = {
"metadata": {"labels": {"app": self.name}},
"spec": {
"containers": [container_template],
},
}

stateful_set_spec = {
"replicas": 1,
"selector": {"matchLabels": {"app": self.name}},
"template": pod_template,
}

database = self.postgres or self.sqlite
if not database:
raise NotImplementedError("No database defined")

database.configure_prefect_server(stateful_set_spec, container_template)

return {
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": {"namespace": self.namespace, "name": self.name},
"spec": stateful_set_spec,
}

def desired_service(self) -> dict[str, Any]:
return {
"apiVersion": "v1",
Expand Down Expand Up @@ -323,46 +289,34 @@ def reconcile_server(
continue
raise

api = kubernetes.client.AppsV1Api()

if database.is_file_based:
desired_stateful_set = server.desired_stateful_set()

desired_persistent_volume_claim = database.desired_persistent_volume_claim(server)
if desired_persistent_volume_claim:
api = kubernetes.client.CoreV1Api()
try:
api.create_namespaced_stateful_set(
api.create_namespaced_persistent_volume_claim(
server.namespace,
desired_stateful_set,
desired_persistent_volume_claim,
)
logger.info("Created stateful set %s", name)
logger.info("Created persistent volume claim %s", name)
except kubernetes.client.ApiException as e:
if e.status != 409:
raise

api.replace_namespaced_stateful_set(
desired_stateful_set["metadata"]["name"],
server.namespace,
desired_stateful_set,
)
logger.info("Updated stateful set %s", name)
else:
desired_deployment = server.desired_deployment()

try:
api.create_namespaced_deployment(
server.namespace,
desired_deployment,
)
logger.info("Created deployment %s", name)
except kubernetes.client.ApiException as e:
if e.status != 409:
raise
api = kubernetes.client.AppsV1Api()
desired_deployment = server.desired_deployment()
try:
api.create_namespaced_deployment(server.namespace, desired_deployment)
logger.info("Created deployment %s", name)
except kubernetes.client.ApiException as e:
if e.status != 409:
raise

api.replace_namespaced_deployment(
desired_deployment["metadata"]["name"],
server.namespace,
desired_deployment,
)
logger.info("Updated deployment %s", name)
api.replace_namespaced_deployment(
desired_deployment["metadata"]["name"],
server.namespace,
desired_deployment,
)
logger.info("Updated deployment %s", name)

desired_service = server.desired_service()
api = kubernetes.client.CoreV1Api()
Expand Down Expand Up @@ -404,15 +358,6 @@ def delete_server(
raise

api = kubernetes.client.AppsV1Api()
try:
api.delete_namespaced_stateful_set(name, namespace)
logger.info("Deleted stateful set %s", name)
except kubernetes.client.ApiException as e:
if e.status == 404:
logger.info("Stateful set %s not found", name)
else:
raise

try:
api.delete_namespaced_deployment(name, namespace)
logger.info("Deleted deployment %s", name)
Expand Down
2 changes: 1 addition & 1 deletion sync-pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def resolve_repo_versions(pinned: dict[str, str]) -> dict[str, str]:


def resolve_mypy_dependencies(pinned: dict[str, str], mypy: set[str]) -> list[str]:
"""Given the pinned development dependencies and the requsted mypy dependencies,
"""Given the pinned development dependencies and the requested mypy dependencies,
resolve them into pip version specifiers (like "mypackage>=1.2.3")"""
resolved = {f"{dep}{pinned[dep]}" for dep in mypy if dep in pinned}

Expand Down

0 comments on commit 2f3fa17

Please sign in to comment.