From e3de3c09f396d6611d6b6f21b68900544cdafbdb Mon Sep 17 00:00:00 2001 From: Nikhil Singhal Date: Fri, 18 Oct 2024 13:25:59 +0200 Subject: [PATCH 1/6] fix-kubernetes-pod-status --- .../kubernetes_wait_for_pod_status.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py index 9a51e51c..6270621b 100644 --- a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py +++ b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py @@ -32,10 +32,10 @@ class KubernetesWaitForPodStatusState(Enum): class KubernetesWaitForPodStatus(BaseAction): - def __init__(self, within_cluster: bool): + def __init__(self, within_cluster: bool, namespace: str): super().__init__() self.target = None - self.namespace = None + self.namespace = namespace self.expected_status = None self.within_cluster = within_cluster self.regex = None @@ -53,13 +53,13 @@ def setup(self, **kwargs): self.monitoring_thread = threading.Thread(target=self.watch_pods, daemon=True) self.monitoring_thread.start() - def execute(self, target: str, regex: bool, status: tuple, namespace: str): + def execute(self, target: str, regex: bool, status: tuple, ): self.target = target - self.namespace = namespace if not isinstance(status, tuple) or not isinstance(status[0], str): raise ValueError("Status expected to be enum.") self.expected_status = status[0] self.regex = regex + self.update_queue = queue.Queue() self.current_state = KubernetesWaitForPodStatusState.MONITORING def update(self) -> py_trees.common.Status: @@ -89,8 +89,7 @@ def watch_pods(self): for event in w.stream(self.client.list_namespaced_pod, namespace=self.namespace): pod_name = event['object'].metadata.name pod_status = event['object'].status.phase - if self.current_state == KubernetesWaitForPodStatusState.MONITORING: - self.update_queue.put((pod_name, pod_status)) + self.update_queue.put((pod_name, pod_status)) except ApiException as e: self.logger.error(f"Error accessing kubernetes: {e}") self.update_queue.put(()) From f1ccf4afd3b2fb891a5d800ba76f302192441f62 Mon Sep 17 00:00:00 2001 From: Nikhil Singhal Date: Fri, 18 Oct 2024 13:27:40 +0200 Subject: [PATCH 2/6] format --- .../kubernetes_wait_for_pod_status.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py index 6270621b..1b70ffa6 100644 --- a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py +++ b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py @@ -53,7 +53,7 @@ def setup(self, **kwargs): self.monitoring_thread = threading.Thread(target=self.watch_pods, daemon=True) self.monitoring_thread.start() - def execute(self, target: str, regex: bool, status: tuple, ): + def execute(self, target: str, regex: bool, status: tuple): self.target = target if not isinstance(status, tuple) or not isinstance(status[0], str): raise ValueError("Status expected to be enum.") From 9f8d548ee779441a46d84f03be8440eb53c60de7 Mon Sep 17 00:00:00 2001 From: Nikhil Singhal Date: Mon, 21 Oct 2024 18:31:29 +0200 Subject: [PATCH 3/6] fix feedback message and repeat --- .../kubernetes_wait_for_pod_status.py | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py index 1b70ffa6..f5488d3b 100644 --- a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py +++ b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py @@ -21,27 +21,19 @@ from kubernetes import client, config, watch from kubernetes.client.rest import ApiException import re -from enum import Enum - - -class KubernetesWaitForPodStatusState(Enum): - IDLE = 1 - MONITORING = 2 - FAILURE = 3 class KubernetesWaitForPodStatus(BaseAction): - def __init__(self, within_cluster: bool, namespace: str): + def __init__(self, within_cluster: bool): super().__init__() self.target = None - self.namespace = namespace self.expected_status = None self.within_cluster = within_cluster self.regex = None self.client = None self.update_queue = queue.Queue() - self.current_state = KubernetesWaitForPodStatusState.IDLE + self.is_pod = False def setup(self, **kwargs): if self.within_cluster: @@ -50,35 +42,37 @@ def setup(self, **kwargs): config.load_kube_config() self.client = client.CoreV1Api() - self.monitoring_thread = threading.Thread(target=self.watch_pods, daemon=True) - self.monitoring_thread.start() - - def execute(self, target: str, regex: bool, status: tuple): + def execute(self, target: str, regex: bool, status: tuple, namespace: str): + self.namespace = namespace self.target = target if not isinstance(status, tuple) or not isinstance(status[0], str): raise ValueError("Status expected to be enum.") self.expected_status = status[0] self.regex = regex - self.update_queue = queue.Queue() - self.current_state = KubernetesWaitForPodStatusState.MONITORING + self.is_pod = False + self.monitoring_thread = threading.Thread(target=self.watch_pods, daemon=True) + self.monitoring_thread.start() def update(self) -> py_trees.common.Status: while not self.update_queue.empty(): item = self.update_queue.get() if len(item) != 2: return py_trees.common.Status.FAILURE - - self.feedback_message = f"waiting for status of pod '{self.target}'." # pylint: disable= attribute-defined-outside-init + if not self.is_pod: + self.feedback_message = f"waiting for status of pod '{self.target}'." # pylint: disable= attribute-defined-outside-init if not self.regex: if item[0] != self.target: continue else: if not re.search(self.target, item[0]): continue + # self.feedback_message = f"The pod '{self.target}' is currently in '{item[1].lower()}' status." # pylint: disable= attribute-defined-outside-init if item[1].lower() == self.expected_status: + self.is_pod = True self.feedback_message = f"Pod '{item[0]}' changed to expected status '{item[1].lower()}'." # pylint: disable= attribute-defined-outside-init return py_trees.common.Status.SUCCESS else: + self.is_pod = True self.feedback_message = f"Pod '{item[0]}' changed to status '{item[1].lower()}', expected '{self.expected_status}'." # pylint: disable= attribute-defined-outside-init return py_trees.common.Status.RUNNING From 609f0fe4c8141af73ba96b375cb881bd88b11f59 Mon Sep 17 00:00:00 2001 From: Nikhil Singhal Date: Tue, 22 Oct 2024 10:04:17 +0200 Subject: [PATCH 4/6] cleanup --- .../kubernetes_wait_for_pod_status.py | 1 - 1 file changed, 1 deletion(-) diff --git a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py index f5488d3b..131460f4 100644 --- a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py +++ b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py @@ -66,7 +66,6 @@ def update(self) -> py_trees.common.Status: else: if not re.search(self.target, item[0]): continue - # self.feedback_message = f"The pod '{self.target}' is currently in '{item[1].lower()}' status." # pylint: disable= attribute-defined-outside-init if item[1].lower() == self.expected_status: self.is_pod = True self.feedback_message = f"Pod '{item[0]}' changed to expected status '{item[1].lower()}'." # pylint: disable= attribute-defined-outside-init From 15f7551df0885a88cf1317eddd5766f3e34fbb68 Mon Sep 17 00:00:00 2001 From: Nikhil Singhal Date: Wed, 13 Nov 2024 13:01:19 +0100 Subject: [PATCH 5/6] fix action --- .../kubernetes_wait_for_pod_status.py | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py index 131460f4..041ad51c 100644 --- a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py +++ b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py @@ -21,19 +21,28 @@ from kubernetes import client, config, watch from kubernetes.client.rest import ApiException import re +from enum import Enum + + +class KubernetesWaitForPodStatusState(Enum): + IDLE = 1 + MONITORING = 2 + FAILURE = 3 class KubernetesWaitForPodStatus(BaseAction): - def __init__(self, within_cluster: bool): + def __init__(self, within_cluster: bool, namespace: str): super().__init__() self.target = None + self.namespace = namespace self.expected_status = None self.within_cluster = within_cluster self.regex = None self.client = None self.update_queue = queue.Queue() - self.is_pod = False + self.current_state = KubernetesWaitForPodStatusState.IDLE + self.last_state = None def setup(self, **kwargs): if self.within_cluster: @@ -42,23 +51,28 @@ def setup(self, **kwargs): config.load_kube_config() self.client = client.CoreV1Api() - def execute(self, target: str, regex: bool, status: tuple, namespace: str): - self.namespace = namespace + self.monitoring_thread = threading.Thread(target=self.watch_pods, daemon=True) + self.monitoring_thread.start() + + def execute(self, target: str, regex: bool, status: tuple, ): self.target = target if not isinstance(status, tuple) or not isinstance(status[0], str): raise ValueError("Status expected to be enum.") self.expected_status = status[0] self.regex = regex - self.is_pod = False - self.monitoring_thread = threading.Thread(target=self.watch_pods, daemon=True) - self.monitoring_thread.start() + self.current_state = KubernetesWaitForPodStatusState.MONITORING + self.last_state = None def update(self) -> py_trees.common.Status: + print('que started.....') + for item in list(self.update_queue.queue): + print(item) + print('que end.....') while not self.update_queue.empty(): item = self.update_queue.get() if len(item) != 2: return py_trees.common.Status.FAILURE - if not self.is_pod: + if self.last_state is None: self.feedback_message = f"waiting for status of pod '{self.target}'." # pylint: disable= attribute-defined-outside-init if not self.regex: if item[0] != self.target: @@ -66,23 +80,29 @@ def update(self) -> py_trees.common.Status: else: if not re.search(self.target, item[0]): continue - if item[1].lower() == self.expected_status: - self.is_pod = True + if item[1].lower() == self.expected_status and self.last_state is not None: self.feedback_message = f"Pod '{item[0]}' changed to expected status '{item[1].lower()}'." # pylint: disable= attribute-defined-outside-init + self.current_state = KubernetesWaitForPodStatusState.IDLE return py_trees.common.Status.SUCCESS else: - self.is_pod = True self.feedback_message = f"Pod '{item[0]}' changed to status '{item[1].lower()}', expected '{self.expected_status}'." # pylint: disable= attribute-defined-outside-init + self.last_state = item[1].lower() return py_trees.common.Status.RUNNING def watch_pods(self): w = watch.Watch() try: - # TODO: make use of send_initial_events=false in the future + initial_pods = self.client.list_namespaced_pod(namespace=self.namespace).items + for pod in initial_pods: + pod_name = pod.metadata.name + pod_status = pod.status.phase + self.update_queue.put((pod_name, pod_status)) for event in w.stream(self.client.list_namespaced_pod, namespace=self.namespace): + print('Thread is runnnin.....') pod_name = event['object'].metadata.name pod_status = event['object'].status.phase - self.update_queue.put((pod_name, pod_status)) + if self.current_state == KubernetesWaitForPodStatusState.MONITORING: + self.update_queue.put((pod_name, pod_status)) except ApiException as e: - self.logger.error(f"Error accessing kubernetes: {e}") + self.logger.error(f"Error accessing Kubernetes: {e}") self.update_queue.put(()) From e0a23100fea8f670b41b97d1458dd6e1d777a719 Mon Sep 17 00:00:00 2001 From: Nikhil Singhal Date: Wed, 13 Nov 2024 13:12:49 +0100 Subject: [PATCH 6/6] cleanup --- .../kubernetes_wait_for_pod_status.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py index 041ad51c..cb00501c 100644 --- a/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py +++ b/libs/scenario_execution_kubernetes/scenario_execution_kubernetes/kubernetes_wait_for_pod_status.py @@ -64,10 +64,6 @@ def execute(self, target: str, regex: bool, status: tuple, ): self.last_state = None def update(self) -> py_trees.common.Status: - print('que started.....') - for item in list(self.update_queue.queue): - print(item) - print('que end.....') while not self.update_queue.empty(): item = self.update_queue.get() if len(item) != 2: @@ -98,7 +94,6 @@ def watch_pods(self): pod_status = pod.status.phase self.update_queue.put((pod_name, pod_status)) for event in w.stream(self.client.list_namespaced_pod, namespace=self.namespace): - print('Thread is runnnin.....') pod_name = event['object'].metadata.name pod_status = event['object'].status.phase if self.current_state == KubernetesWaitForPodStatusState.MONITORING: