From 1b5d839989e52308746fd8aba1eb359cf2e538dc Mon Sep 17 00:00:00 2001 From: Alessandro De Maria Date: Tue, 17 Dec 2024 12:35:18 +0000 Subject: [PATCH] Improve generators --- kubernetes/argocd.py | 49 ++------ kubernetes/autoscaling.py | 28 ++++- kubernetes/common.py | 136 ++++++++++++++++++++-- kubernetes/files.py | 17 +++ kubernetes/istio.py | 17 ++- kubernetes/networking.py | 41 ++++++- kubernetes/rbac.py | 233 ++++++++++++++++++++++++++++++++------ kubernetes/storage.py | 32 ++++-- kubernetes/workloads.py | 163 ++++++++++++++++---------- 9 files changed, 559 insertions(+), 157 deletions(-) create mode 100644 kubernetes/files.py diff --git a/kubernetes/argocd.py b/kubernetes/argocd.py index bf66cfd..647b240 100644 --- a/kubernetes/argocd.py +++ b/kubernetes/argocd.py @@ -10,10 +10,10 @@ class ArgoCDApplicationConfigSpec(KubernetesResourceSpec): project: str = "default" - destination: Dict = None + destination: dict = None source: dict sync_policy: dict = None - ignore_differences: dict = None + ignore_differences: list[dict] = None class ArgoCDApplication(KubernetesResource): @@ -27,7 +27,7 @@ def body(self): self.root.spec.project = self.config.project destination = self.config.destination self.root.spec.destination.name = self.cluster.display_name or destination.name - self.root.spec.destination.namespace = destination.namespace + self.root.spec.destination.namespace = destination.get("namespace") self.root.spec.source = self.config.source self.root.spec.syncPolicy = self.config.sync_policy @@ -44,7 +44,6 @@ def body(self): path="generators.argocd.applications", global_generator=True, activation_path="argocd.app_of_apps", - apply_patches=["generators.argocd.defaults.application"], ) class GenArgoCDApplication(kgenlib.BaseStore): def body(self): @@ -53,6 +52,11 @@ def body(self): name = config.get("name", self.name) enabled = config.get("enabled", True) clusters = self.inventory.parameters.get("clusters", {}) + single_cluster = config.get("single_cluster", False) + if single_cluster: + cluster = self.inventory.parameters.get("cluster", {}) + cluster_id = cluster.get("user", None) + clusters = {cluster_id: cluster} if enabled: for cluster in clusters.keys(): @@ -72,40 +76,6 @@ def body(self): self.add(argo_application) -@kgenlib.register_generator( - path="isoflow.graphs", - global_generator=True, - activation_path="argocd.app_of_apps", -) -class GenArgoCDApplicationPerResource(kgenlib.BaseStore): - def body(self): - graph_config = self.config - argocd_application_config = graph_config.get("argocd_application_config", {}) - - # Applies generator defaults to the config of the argocd application - kgenlib.patch_config( - argocd_application_config, - self.inventory, - "parameters.generators.argocd.defaults.application", - ) - - graph_name = graph_config.get("name", self.name) - argocd_application_config.setdefault("source", {}).setdefault("directory", {})[ - "include" - ] = f"{graph_name}.yml" - argocd_application_config["source"].pop("plugin", None) - - namespace = argocd_application_config["destination"]["namespace"] - cluster = argocd_application_config["destination"]["name"] - - graph_name = f"{graph_name}.{namespace}.{cluster}" - - argo_application = ArgoCDApplication( - name=graph_name, config=argocd_application_config - ) - self.add(argo_application) - - class ArgoCDProjectConfigSpec(KubernetesResourceSpec): source_repos: list = [] destinations: list = [] @@ -130,7 +100,8 @@ def body(self): @kgenlib.register_generator( path="generators.argocd.projects", - apply_patches=["generators.argocd.defaults.project"], + global_generator=True, + activation_path="argocd.app_of_apps", ) class GenArgoCDProject(kgenlib.BaseStore): def body(self): diff --git a/kubernetes/autoscaling.py b/kubernetes/autoscaling.py index 2339771..287c5b8 100644 --- a/kubernetes/autoscaling.py +++ b/kubernetes/autoscaling.py @@ -2,7 +2,7 @@ logger = logging.getLogger(__name__) -from .common import KubernetesResource +from .common import KubernetesResource, kgenlib class KedaScaledObject(KubernetesResource): @@ -17,6 +17,16 @@ def body(self): self.root.spec.scaleTargetRef.kind = workload.root.kind self.root.spec.scaleTargetRef.apiVersion = workload.root.apiVersion self.root.spec.update(config.keda_scaled_object) + if self.root.spec.maxReplicaCount == 0: + # keda supports pausing autoscaling + # but doesn't support setting maxReplicaCount to 0 + self.root.metadata.annotations.update( + { + "autoscaling.keda.sh/paused-replicas": "0", + "autoscaling.keda.sh/paused": "true", + } + ) + self.root.spec.maxReplicaCount = 1 # remove replica from workload because HPA is managing it workload.root.spec.pop("replicas") @@ -83,3 +93,19 @@ def body(self): # remove replica from workload because HPA is managing it workload.root.spec.pop("replicas") + + +@kgenlib.register_generator(path="generators.kubernetes.vpa") +class VerticalPodAutoscalerGenerator(kgenlib.BaseStore): + def body(self): + name = self.config.get("name", self.name) + self.config.vpa.update_mode = self.config.update_mode + self.config.vpa.resource_policy = self.config.resource_policy + + workload = KubernetesResource( + name=name, kind=self.config.kind, api_version=self.config.api_version + ) + + self.add( + VerticalPodAutoscaler(name=name, config=self.config, workload=workload) + ) diff --git a/kubernetes/common.py b/kubernetes/common.py index 582fd50..d218c0e 100644 --- a/kubernetes/common.py +++ b/kubernetes/common.py @@ -2,10 +2,11 @@ logger = logging.getLogger(__name__) from enum import StrEnum, auto -from typing import Any, Dict, List, Optional +from typing import Annotated, Any, Dict, List, Optional, Union from kapitan.inputs.kadet import BaseObj, load_from_search_paths -from pydantic import DirectoryPath, Field, FilePath +from pydantic import ConfigDict, DirectoryPath, Field, FilePath, model_validator +from pydantic.alias_generators import to_camel kgenlib = load_from_search_paths("kgenlib") @@ -68,6 +69,8 @@ def body(self): self.root.kind = self.kind self.root.metadata.name = self.rendered_name or self.name + self.root.metadata.labels + self.root.metadata.annotations self.add_label("name", self.name) if self.config: self.add_labels(self.config.labels) @@ -141,6 +144,9 @@ def body(self): def set_namespace(self, namespace: str): self.root.metadata.namespace = namespace + def remove_namespace(self): + self.root.metadata.pop("namespace") + class WorkloadTypes(StrEnum): DEPLOYMENT = auto() @@ -148,6 +154,7 @@ class WorkloadTypes(StrEnum): DAEMONSET = auto() JOB = auto() CRONJOB = auto() + CLOUD_RUN_SERVICE = auto() class RestartPolicy(StrEnum): @@ -156,6 +163,12 @@ class RestartPolicy(StrEnum): NEVER = "Never" +class ConcurrentPolicy(StrEnum): + ALLOW = "Allow" + FORBID = "Forbid" + REPLACE = "Replace" + + class ImagePullPolicy(StrEnum): ALWAYS = "Always" IF_NOT_PRESENT = "IfNotPresent" @@ -286,7 +299,9 @@ class ServiceAccountConfigSpec(KubernetesResourceSpec): namespace: Optional[str] = None rendered_name: Optional[str] = None name: Optional[str] = None - roles: Optional[List[Dict[str, Any]]] = None + roles: Optional[ + list[dict[str, list[str]]] | dict[str, dict[str, list[str]] | list[str]] + ] = None class ContainerSpec(kgenlib.BaseModel): @@ -298,7 +313,7 @@ class ContainerSpec(kgenlib.BaseModel): healthcheck: Optional[HealthCheckConfigSpec] = None image: str = None image_pull_policy: Optional[ImagePullPolicy] = ImagePullPolicy.IF_NOT_PRESENT - lifecycle: dict = {} + lifecycle: Optional[dict] = None pod_annotations: dict = {} pod_labels: dict = {} ports: Dict[str, ContainerPortSpec] = {} @@ -308,6 +323,10 @@ class ContainerSpec(kgenlib.BaseModel): volume_mounts: dict = {} +class InitContainerSpec(ContainerSpec): + sidecar: Optional[bool] = None + + class ServiceTypes(StrEnum): EXTERNAL_NAME = "ExternalName" CLUSTER_IP = "ClusterIP" @@ -322,7 +341,7 @@ class SessionAffinity(StrEnum): class RoleBindingConfigSpec(KubernetesResourceSpec): roleRef: Optional[Dict[str, Any]] = None - subject: Optional[List[Dict[str, Any]]] = None + subjects: Optional[List[Dict[str, Any]]] = None class ServiceConfigSpec(KubernetesResourceSpec): @@ -355,8 +374,9 @@ class NetworkPolicySpec(KubernetesResourceSpec): class WorkloadConfigSpec(KubernetesResourceSpec, ContainerSpec): type: Optional[WorkloadTypes] = WorkloadTypes.DEPLOYMENT + namespace: str | None = None schedule: Optional[str] = None - additional_containers: Optional[Dict[str, ContainerSpec]] = {} + additional_containers: Optional[Dict[str, Union[ContainerSpec, None]]] = {} additional_services: Optional[Dict[str, ServiceConfigSpec]] = {} annotations: dict = {} application: Optional[str] = None @@ -371,7 +391,7 @@ class WorkloadConfigSpec(KubernetesResourceSpec, ContainerSpec): host_pid: Optional[bool] = None hpa: dict = {} image_pull_secrets: list = [] - init_containers: Optional[Dict[str, ContainerSpec]] = {} + init_containers: Optional[Dict[str, Union[InitContainerSpec, None]]] = {} istio_policy: dict = {} keda_scaled_object: dict = {} labels: Dict[str, str] = {} @@ -399,16 +419,25 @@ class WorkloadConfigSpec(KubernetesResourceSpec, ContainerSpec): workload_security_context: dict = {} +class CloudRunServiceConfigSpec(WorkloadConfigSpec): + type: Optional[WorkloadTypes] = WorkloadTypes.CLOUD_RUN_SERVICE + + class DeploymentConfigSpec(WorkloadConfigSpec): type: Optional[WorkloadTypes] = WorkloadTypes.DEPLOYMENT update_strategy: Optional[dict] = {} strategy: Optional[dict] = {} +class PodManagementPolicy(StrEnum): + ORDERED_READY = "OrderedReady" + PARALLEL = "Parallel" + + class StatefulSetConfigSpec(WorkloadConfigSpec): type: WorkloadTypes = WorkloadTypes.STATEFULSET + pod_management_policy: str = PodManagementPolicy.ORDERED_READY update_strategy: dict = {} - strategy: dict = {} class DaemonSetConfigSpec(WorkloadConfigSpec): @@ -426,3 +455,94 @@ class JobConfigSpec(WorkloadConfigSpec): class CronJobConfigSpec(JobConfigSpec): type: WorkloadTypes = WorkloadTypes.CRONJOB schedule: str + concurrency_policy: Optional[ConcurrentPolicy] = ConcurrentPolicy.ALLOW + + +ContainerProbeSpecTypes = Annotated[ + Union[ContainerEXECProbeSpec, ContainerHTTPProbeSpec, ContainerTCPProbeSpec], + Field(discriminator="type"), +] + + +class ContainerProbes(kgenlib.BaseModel): + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + from_attributes=True, + extra="ignore", + ) + type: ProbeTypes = Field(exclude=True) + initial_delay_seconds: int = 0 + period_seconds: int = 10 + timeout_seconds: int = 1 + success_threshold: int = 1 + failure_threshold: int = 3 + + # Define a class method for creating probes from data + @classmethod + def from_spec(cls, spec: ContainerProbeSpecTypes) -> Union["ContainerProbes", None]: + probe = None + if not spec or not spec.enabled: + return probe + probe_type = spec.type + if probe_type == ProbeTypes.TCP: + probe = TCPProbe.model_validate(spec) + elif probe_type == ProbeTypes.HTTP: + probe = HTTPProbe.model_validate(spec) + elif probe_type == ProbeTypes.EXEC: + probe = ExecProbe.model_validate(spec) + else: + raise ValueError(f"Invalid probe type: {probe_type}") + return probe.model_dump(by_alias=True) + + +class HttpGet(kgenlib.BaseModel): + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + from_attributes=True, + ) + path: str = "/" + port: str | int = 80 + httpHeaders: Optional[List[dict]] = None + scheme: Optional[ProbeSchemeSpec] = ProbeSchemeSpec.HTTP + + +class HTTPProbe(ContainerProbes): + httpGet: Optional[HttpGet] = None + port: str | int = Field(exclude=True) + path: str = Field(exclude=True) + scheme: Optional[ProbeSchemeSpec] = Field(exclude=True) + httpHeaders: Optional[List[dict]] = Field(exclude=True) + + # Use a validator to handle the 'port' mapping + @model_validator(mode="after") + def map_port_to_http_config(self): + self.httpGet = HttpGet.model_validate(self) + return self + + +class TCPProbe(ContainerProbes): + port: str | int = Field(exclude=True) + tcpSocket: Optional[dict] = None + + # Use a validator to handle the 'port' mapping + @model_validator(mode="after") + def map_port_to_tcp_socket(self): + self.tcpSocket = {"port": self.port} + return self + + +class ExecProbe(ContainerProbes): + command: List = Field(exclude=True) + exec: Optional[dict] = None + + @model_validator(mode="after") + def map_command_to_exec_socket(self): + self.exec = {"command": self.command} + return self + + +ContainerProbeTypes = Annotated[ + Union[HTTPProbe, TCPProbe, ExecProbe], Field(discriminator="type") +] diff --git a/kubernetes/files.py b/kubernetes/files.py new file mode 100644 index 0000000..44da6de --- /dev/null +++ b/kubernetes/files.py @@ -0,0 +1,17 @@ +import logging + +from .common import kgenlib + +logger = logging.getLogger(__name__) + + +@kgenlib.register_generator(path="generators.kubernetes.raw") +class RawManifestFilesGenerator(kgenlib.BaseStore): + def body(self): + for file in self.config.files: + self.add(kgenlib.BaseStore.from_yaml_file(file)) + if self.config.filename: + [ + setattr(content, "filename", self.config.filename) + for content in self.content_list + ] diff --git a/kubernetes/istio.py b/kubernetes/istio.py index bdd87ee..59b2193 100644 --- a/kubernetes/istio.py +++ b/kubernetes/istio.py @@ -2,7 +2,7 @@ logger = logging.getLogger(__name__) -from .common import KubernetesResource +from .common import KubernetesResource, kgenlib class IstioPolicy(KubernetesResource): @@ -16,3 +16,18 @@ def body(self): self.root.spec.origins = config.istio_policy.policies.origins self.root.spec.principalBinding = "USE_ORIGIN" self.root.spec.targets = [{"name": name}] + + +@kgenlib.register_generator(path="generators.istio.gateway") +class IstioGatewayGenerator(kgenlib.BaseStore): + def body(self): + self.add(IstioGateway(name=self.id, config=self.config)) + + +class IstioGateway(KubernetesResource): + kind: str = "Gateway" + api_version: str = "networking.istio.io/v1" + + def body(self): + super().body() + self.root.spec = self.config.spec diff --git a/kubernetes/networking.py b/kubernetes/networking.py index d157c0b..b321ab7 100644 --- a/kubernetes/networking.py +++ b/kubernetes/networking.py @@ -2,11 +2,9 @@ logger = logging.getLogger(__name__) -from enum import StrEnum, auto +from enum import StrEnum from typing import Any, Dict, List, Optional -from kadet import BaseModel - from .common import ( KubernetesResource, KubernetesResourceSpec, @@ -200,6 +198,7 @@ def body(self): class GCPBackendConfigSpec(KubernetesResourceSpec): timeout_sec: Optional[int] = 30 logging: Optional[Dict[str, Any]] = None + iap: Optional[Dict[str, Any]] = None class GCPBackendPolicy(KubernetesResource): @@ -212,6 +211,7 @@ def body(self): self.root.spec.default.timeoutSec = self.config.timeout_sec or 30 self.root.spec.default.logging = self.config.logging or {"enabled": False} + self.root.spec.default.iap = self.config.iap self.root.spec.targetRef = { "group": "", @@ -269,6 +269,18 @@ def body(self): self.root.spec.setdefault("rules", []).append(rule) +@kgenlib.register_generator(path="generators.kubernetes.routes") +class RouteGenerator(kgenlib.BaseStore): + config: HTTPRouteSpec + + def body(self): + config = self.config + name = self.name + + route = HTTPRoute(name=name, config=config) + self.add(route) + + @kgenlib.register_generator(path="generators.kubernetes.gateway") class GatewayGenerator(kgenlib.BaseStore): def body(self): @@ -293,18 +305,30 @@ def body(self): route_config, gateway_name=gateway.name, gateway_namespace=gateway.namespace, + namespace=route_config.get("namespace") or gateway.namespace, ), ) self.add(route) for service_id, service_config in route_config.get("services", {}).items(): healthcheck = HealthCheckPolicy( - name=f"{service_id}", config=service_config + name=f"{service_id}", + config=dict( + service_config, + namespace=service_config.get("namespace") or gateway.namespace, + ), ) self.add(healthcheck) backend_policy = GCPBackendPolicy( - name=f"{service_id}", config=GCPBackendConfigSpec(**service_config) + name=f"{service_id}", + config=GCPBackendConfigSpec( + **dict( + service_config, + namespace=route_config.get("namespace") + or gateway.namespace, + ) + ), ) self.add(backend_policy) @@ -335,8 +359,13 @@ def body(self): if spec.headless: self.root.spec.clusterIP = "None" self.root.spec.sessionAffinity = spec.session_affinity + additional_containers = [ + _config + for _config in config.additional_containers.values() + if _config is not None + ] all_ports = [config.ports] + [ - container.ports for container in config.additional_containers.values() + container.ports for container in additional_containers ] self.exposed_ports = {} diff --git a/kubernetes/rbac.py b/kubernetes/rbac.py index 520215e..bbc44f7 100644 --- a/kubernetes/rbac.py +++ b/kubernetes/rbac.py @@ -24,13 +24,19 @@ class ServiceAccount(KubernetesResource): spec: Optional[ServiceAccountComponentConfigSpec] = None def body(self): + if self.config: + config = self.config + if isinstance(config, WorkloadConfigSpec): + self.name = config.service_account.name or self.name + else: + self.name = config.name or self.name super().body() if self.spec: self.add_annotations(self.spec.annotations) class RoleConfigSpec(KubernetesResourceSpec): - rules: List[Dict[str, Any]] = [] + rules: list[dict[str, list[str]]] | dict[str, list[str] | dict[str, list[str]]] = [] class Role(KubernetesResource): @@ -41,32 +47,75 @@ class Role(KubernetesResource): def body(self): super().body() if self.spec: - self.root.rules = self.spec.rules + rules = self.spec.rules + resolved_rules = [] + if isinstance(rules, list): + resolved_rules = rules + elif isinstance(rules, dict): + for rule_names, rule_config in rules.items(): + api_groups = set() + resources = set() + for rule_name in rule_names.split(","): + rule_name = rule_name.strip() + + # by default if apiGroups and resources are not defined + # they're going to be inferred from the rule name + if "/" in rule_name: + api_group, *resource_bits = rule_name.split("/") + resource = "/".join(resource_bits) + else: + api_group = "" + resource = rule_name + api_groups.add(api_group) + resources.add(resource) + + # if rule has form {name: [...]} + # then the list is treated as role verbs + if isinstance(rule_config, list): + rule_config = {"verbs": rule_config} + resolved_rules.append( + { + **{ + "apiGroups": sorted(api_groups), + "resources": sorted(resources), + }, + **rule_config, + } + ) + else: + raise TypeError( + f"rules are wrong type. expected list or dict, found: {type(rules)}" + ) + self.root.rules = resolved_rules class RoleBinding(KubernetesResource): kind: str = "RoleBinding" api_version: str = "rbac.authorization.k8s.io/v1" - role: Role - sa: ServiceAccount + role: Optional[Role] = None + sa: Optional[ServiceAccount] = None spec: Optional[RoleBindingConfigSpec] = RoleBindingConfigSpec() def body(self): super().body() - default_role_ref = { - "apiGroup": "rbac.authorization.k8s.io", - "kind": "Role", - "name": self.role.name, - } - default_subject = [ - { - "kind": "ServiceAccount", - "name": self.sa.name, - "namespace": self.sa.namespace, + if self.role and self.sa: + default_role_ref = { + "apiGroup": "rbac.authorization.k8s.io", + "kind": "Role", + "name": self.role.name, } - ] - self.root.roleRef = self.spec.roleRef or default_role_ref - self.root.subjects = self.spec.subject or default_subject + default_subject = [ + { + "kind": "ServiceAccount", + "name": self.sa.name, + "namespace": self.sa.namespace, + } + ] + self.root.roleRef = default_role_ref + self.root.subjects = default_subject + elif self.spec: + self.root.roleRef = self.spec.roleRef + self.root.subjects = self.spec.subjects class ClusterRoleBindingConfigSpec(KubernetesResourceSpec): @@ -76,7 +125,7 @@ class ClusterRoleBindingConfigSpec(KubernetesResourceSpec): class ClusterRoleConfigSpec(KubernetesResourceSpec): rules: List[Dict[str, Any]] = [] - binding: Optional[ClusterRoleBindingConfigSpec] + binding: Optional[ClusterRoleBindingConfigSpec] = None class ClusterRole(KubernetesResource): @@ -92,26 +141,73 @@ def body(self): class ClusterRoleBinding(KubernetesResource): kind: str = "ClusterRoleBinding" api_version: str = "rbac.authorization.k8s.io/v1" - sa: ServiceAccount - role: ClusterRole - spec: ClusterRoleConfigSpec + sa: Optional[ServiceAccount] = None + role: Optional[ClusterRole] = None + spec: Optional[ClusterRoleBindingConfigSpec] = ClusterRoleBindingConfigSpec() def body(self): super().body() - default_role_ref = { - "apiGroup": "rbac.authorization.k8s.io", - "kind": "ClusterRole", - "name": self.role.name, - } - default_subject = [ - { - "kind": "ServiceAccount", - "name": self.sa.name, - "namespace": self.sa.namespace, + if self.role and self.sa: + default_role_ref = { + "apiGroup": "rbac.authorization.k8s.io", + "kind": "ClusterRole", + "name": self.role.name, } + default_subject = [ + { + "kind": "ServiceAccount", + "name": self.sa.name, + "namespace": self.sa.namespace, + } + ] + self.root.roleRef = default_role_ref + self.root.subjects = default_subject + elif self.spec: + self.root.roleRef = self.spec.roleRef + self.root.subjects = self.spec.subjects + + +@kgenlib.register_generator(path="generators.kubernetes.cluster_rolebinding") +class ClusterRoleBindingGenerator(kgenlib.BaseStore): + name: str + + def body(self): + config = self.config + name = config.name or self.name + subjects = config.get("subjects") or [] + processed_subjects = [ + self._ensure_subject_format(subject) for subject in subjects ] - self.root.roleRef = self.spec.binding.roleRef or default_role_ref - self.root.subjects = self.spec.binding.subject or default_subject + config.subjects = processed_subjects + + cluster_rolebinding = ClusterRoleBinding(name=name, spec=config) + self.add(cluster_rolebinding) + + @staticmethod + def _ensure_subject_format(subject: Any) -> dict[str, str]: + # @TODO merge with RoleBindingGenerator logic + if isinstance(subject, dict): + return subject + + if not isinstance(subject, str): + raise TypeError(f"subject has to be str or dict, not {type(subject)!r}") + + try: + prefix, identity = subject.split(":") + except ValueError: + raise ValueError(f"could not parse {subject!r} into prefix:suffix") + + match prefix: + case "user": + kind = "User" + case "group": + kind = "Group" + case "serviceAccount": + kind = "User" + case _: + raise ValueError(f"unrecognized identity prefix: {prefix!r}") + + return {"kind": kind, "name": identity} @kgenlib.register_generator(path="generators.kubernetes.service_accounts") @@ -141,3 +237,74 @@ def body(self): spec=spec, ) self.add(rb) + + +@kgenlib.register_generator(path="generators.kubernetes.role") +class RoleGenerator(kgenlib.BaseStore): + name: str + + def body(self): + config = self.config + name = config.name or self.name + namespace = config.namespace + spec = {"rules": config.rules} + role = Role(name=name, namespace=namespace, spec=spec) + self.add(role) + + +@kgenlib.register_generator(path="generators.kubernetes.cluster_role") +class ClusterRoleGenerator(kgenlib.BaseStore): + name: str + + def body(self): + config = self.config + name = config.name or self.name + spec = ClusterRoleConfigSpec(rules=config.rules) + role = ClusterRole(name=name, spec=spec) + self.add(role) + + +@kgenlib.register_generator(path="generators.kubernetes.rolebinding") +class RoleBindingGenerator(kgenlib.BaseStore): + name: str + + def body(self): + config = self.config + name = config.name or self.name + namespace = config.namespace + + subjects = config.get("subjects") or [] + processed_subjects = [ + self._ensure_subject_format(subject) for subject in subjects + ] + config.subjects = processed_subjects + + role_binding = RoleBinding(name=name, namespace=namespace, spec=config) + self.add(role_binding) + + @staticmethod + def _ensure_subject_format(subject: Any) -> dict[str, str]: + if isinstance(subject, dict): + return subject + + if not isinstance(subject, str): + raise TypeError(f"subject has to be str or dict, not {type(subject)!r}") + + # user:bamax@google.com + # group:eng-team@google.com + try: + prefix, identity = subject.split(":") + except ValueError: + raise ValueError(f"could not parse {subject!r} into prefix:suffix") + + match prefix: + case "user": + kind = "User" + case "group": + kind = "Group" + case "serviceAccount": + kind = "User" + case _: + raise ValueError(f"unrecognized identity prefix: {prefix!r}") + + return {"kind": kind, "name": identity} diff --git a/kubernetes/storage.py b/kubernetes/storage.py index d041ebc..22e8711 100644 --- a/kubernetes/storage.py +++ b/kubernetes/storage.py @@ -4,6 +4,8 @@ import os from typing import Any, Optional +from kapitan.resources import inventory + from .common import ( ConfigDataSpec, ConfigMapSpec, @@ -28,19 +30,26 @@ def encode_string(unencoded_string): """Encode a string using base64.""" return base64.b64encode(unencoded_string.encode("ascii")).decode("ascii") - def add_directory(self, directory, encode=False, stringdata=False): + def add_directory(self, directory, encode=False, stringdata=False, basedir=None): """Add contents of files in a directory.""" + if basedir is None: + basedir = directory + if directory and os.path.isdir(directory): logger.debug(f"Adding files from directory {directory} to {self.name}.") for filename in os.listdir(directory): - with open(f"{directory}/{filename}", "r") as f: - file_content = f.read() - self.add_item( - filename, - file_content, - request_encode=encode, - stringdata=stringdata, - ) + path = os.path.join(directory, filename) + if os.path.isfile(path): + with open(path, "r") as f: + file_content = f.read() + self.add_item( + os.path.relpath(path, basedir).replace(os.sep, "_"), + file_content, + request_encode=encode, + stringdata=stringdata, + ) + else: + self.add_directory(path, encode, stringdata, basedir=basedir) def add_item(self, key, value, request_encode=False, stringdata=False): """Add a single item to the resource.""" @@ -72,7 +81,10 @@ def add_from_spec(self, key, spec: ConfigDataSpec, stringdata=False): ): value = kgenlib.render_yaml(value) elif spec.template: - value = kgenlib.render_jinja(spec.template, spec.values) + context = spec.values.copy() + context["inventory"] = inventory(None) + context["inventory_global"] = inventory(None, None) + value = kgenlib.render_jinja(spec.template, context) elif spec.file: with open(spec.file, "r") as f: value = f.read() diff --git a/kubernetes/workloads.py b/kubernetes/workloads.py index 8599b6d..9c79c2b 100644 --- a/kubernetes/workloads.py +++ b/kubernetes/workloads.py @@ -1,11 +1,6 @@ import logging -logger = logging.getLogger(__name__) - - -from typing import Any - -from kapitan.inputs.kadet import BaseModel, BaseObj, CompileError, Dict +from kapitan.inputs.kadet import BaseModel, CompileError from .autoscaling import ( HorizontalPodAutoscaler, @@ -15,16 +10,15 @@ ) from .base import MutatingWebhookConfiguration from .common import ( - ContainerEXECProbeSpec, - ContainerHTTPProbeSpec, + CloudRunServiceConfigSpec, + ContainerProbes, ContainerSpec, - ContainerTCPProbeSpec, CronJobConfigSpec, DaemonSetConfigSpec, DeploymentConfigSpec, + InitContainerSpec, JobConfigSpec, KubernetesResource, - ProbeTypes, StatefulSetConfigSpec, WorkloadConfigSpec, WorkloadTypes, @@ -32,11 +26,49 @@ ) from .gke import BackendConfig from .istio import IstioPolicy -from .networking import HealthCheckPolicy, NetworkPolicy, Service +from .networking import NetworkPolicy, Service from .prometheus import PrometheusRule, ServiceMonitor from .rbac import ClusterRole, ClusterRoleBinding, Role, RoleBinding, ServiceAccount from .storage import ComponentConfig, ComponentSecret +logger = logging.getLogger(__name__) + + +class CloudRunResource(KubernetesResource): + def body(self): + super().body() + name = self.name + config = self.config + + self.root.spec.template.metadata.annotations.update(config.pod_annotations) + self.root.spec.template.metadata.labels.update(config.pod_labels) + + if config.service_account.enabled: + self.root.spec.template.spec.serviceAccountName = ( + config.service_account.name or name + ) + + container = Container(name=name, config=config) + additional_containers = [ + Container(name=name, config=_config) + for name, _config in config.additional_containers.items() + if _config is not None + ] + self.add_containers([container]) + self.add_containers(additional_containers) + self.root.spec.template.spec.imagePullSecrets = config.image_pull_secrets + self.add_volumes(config.volumes) + + def add_volumes(self, volumes): + for key, value in volumes.items(): + kgenlib.merge({"name": key}, value) + self.root.spec.template.spec.setdefault("volumes", []).append(value) + + def add_containers(self, containers): + self.root.spec.template.spec.setdefault("containers", []).extend( + [container.root for container in containers] + ) + class Workload(KubernetesResource): def body(self): @@ -46,35 +78,36 @@ def body(self): self.root.spec.template.spec.hostNetwork = config.host_network self.root.spec.template.spec.hostPID = config.host_pid - self.root.spec.template.metadata.annotations.update( - config.pod_annotations or {} - ) + self.root.spec.template.metadata.annotations.update(config.pod_annotations) self.root.spec.template.metadata.labels.update(config.pod_labels) self.add_volumes(config.volumes) self.root.spec.template.spec.securityContext = config.workload_security_context self.root.spec.minReadySeconds = config.min_ready_seconds - if config.service_account and config.service_account.enabled: + if config.service_account.enabled: self.root.spec.template.spec.serviceAccountName = ( config.service_account.name or name ) container = Container(name=name, config=config) additional_containers = [ - Container(name=name, config=config) - for name, config in config.additional_containers.items() + Container(name=name, config=_config) + for name, _config in config.additional_containers.items() + if _config is not None ] self.add_containers([container]) self.add_containers(additional_containers) init_containers = [ - Container(name=name, config=config) - for name, config in config.init_containers.items() + InitContainer(name=name, config=_config) + for name, _config in config.init_containers.items() + if _config is not None ] - self.root.spec.template.spec.restartPolicy = config.restart_policy self.add_init_containers(init_containers) self.root.spec.template.spec.imagePullSecrets = config.image_pull_secrets self.root.spec.template.spec.dnsPolicy = config.dns_policy + + self.root.spec.template.spec.restartPolicy = config.restart_policy self.root.spec.template.spec.terminationGracePeriodSeconds = config.grace_period self.root.spec.template.spec.nodeSelector = config.node_selector @@ -186,6 +219,15 @@ def add_volumes_for_object(self, object): ) +class CloudRunService(CloudRunResource): + kind: str = "Service" + api_version: str = "serving.knative.dev/v1" + config: CloudRunServiceConfigSpec + + def body(self): + super().body() + + class Deployment(Workload): kind: str = "Deployment" api_version: str = "apps/v1" @@ -231,9 +273,10 @@ def body(self): ) self.root.spec.revisionHistoryLimit = config.revision_history_limit - self.root.spec.strategy = config.strategy + self.root.spec.podManagementPolicy = config.pod_management_policy self.root.spec.updateStrategy = config.update_strategy or update_strategy - self.root.spec.serviceName = config.service.service_name if config.service else name + if config.service: + self.root.spec.serviceName = config.service.service_name or name self.set_replicas(config.replicas) self.add_volume_claims(config.volume_claims) @@ -285,10 +328,13 @@ def body(self): job = Job(name=self.name, config=config) self.root.spec.jobTemplate.spec = job.root.spec self.root.spec.schedule = config.schedule + self.root.spec.concurrencyPolicy = config.concurrency_policy self.root.spec.template = None -class Container(ContainerSpec): +class Container(BaseModel): + config: ContainerSpec + @staticmethod def find_key_in_config(key, configs): for name, config in configs.items(): @@ -372,29 +418,6 @@ def add_volume_mounts(self, volume_mounts): kgenlib.merge({"name": key}, value) self.root.setdefault("volumeMounts", []).append(value) - def create_probe( - self, - spec: ContainerEXECProbeSpec | ContainerHTTPProbeSpec | ContainerTCPProbeSpec, - ): - probe = BaseObj() - if spec and spec.enabled: - probe.root.initialDelaySeconds = spec.initial_delay_seconds - probe.root.periodSeconds = spec.period_seconds - probe.root.timeoutSeconds = spec.timeout_seconds - probe.root.successThreshold = spec.success_threshold - probe.root.failureThreshold = spec.failure_threshold - - if spec.type == ProbeTypes.HTTP: - probe.root.httpGet.scheme = spec.scheme - probe.root.httpGet.port = spec.port - probe.root.httpGet.path = spec.path - probe.root.httpGet.httpHeaders = spec.httpHeaders - elif spec.type == ProbeTypes.TCP: - probe.root.tcpSocket.port = spec.port - elif spec.type == ProbeTypes.EXEC: - probe.root.exec.command = spec.command - return probe.root - def body(self): name = self.name config = self.config @@ -402,7 +425,8 @@ def body(self): self.root.name = name self.root.image = config.image self.root.imagePullPolicy = config.image_pull_policy - self.root.lifecycle = config.lifecycle + if config.lifecycle: + self.root.lifecycle = config.lifecycle self.root.resources = config.resources self.root.args = config.args self.root.command = config.command @@ -428,17 +452,29 @@ def body(self): ) if config.healthcheck: - if config.healthcheck.liveness: - self.root.livenessProbe = self.create_probe(config.healthcheck.liveness) - if config.healthcheck.readiness: - self.root.readinessProbe = self.create_probe( - config.healthcheck.readiness - ) - if config.healthcheck.startup: - self.root.startupProbe = self.create_probe(config.healthcheck.startup) + self.root.livenessProbe = ContainerProbes.from_spec( + config.healthcheck.liveness + ) + self.root.readinessProbe = ContainerProbes.from_spec( + config.healthcheck.readiness + ) + self.root.startupProbe = ContainerProbes.from_spec( + config.healthcheck.startup + ) + self.process_envs(config) +class InitContainer(Container): + config: InitContainerSpec + + def body(self): + config = self.config + super().body() + if config.sidecar: + self.root.restartPolicy = "Always" + + class PodSecurityPolicy(KubernetesResource): kind: str = "PodSecurityPolicy" api_version: str = "policy/v1beta1" @@ -528,6 +564,7 @@ def body(self): name = self.name config = self.config logger.debug(f"Generating components for {name} from {config}") + namespace = config.namespace if config.type == WorkloadTypes.DEPLOYMENT: workload = Deployment(name=name, config=config.model_dump()) @@ -540,6 +577,8 @@ def body(self): workload = CronJob(name=name, config=config.model_dump()) else: workload = Job(name=name, config=config.model_dump()) + elif config.type == WorkloadTypes.CLOUD_RUN_SERVICE: + workload = CloudRunService(name=name, config=config.model_dump()) else: raise ValueError(f"Unknown workload type: {config.type}") @@ -603,7 +642,13 @@ def body(self): self.add(workload) # Patch Application - self.apply_patch( - {"metadata": {"labels": {"app.kapicorp.dev/component": self.name}}} - ) - logger.debug(f"Applied metadata patch for {self.name}.") + if type(workload) != CloudRunService: + self.apply_patch( + {"metadata": {"labels": {"app.kapicorp.dev/component": self.name}}} + ) + logger.debug(f"Applied metadata patch for {self.name}.") + + if namespace: + for o in self.get_content_list(): + if o.root.kind != "Namespace": + o.root.metadata.namespace = namespace