From 8fe63202b82c9783212d954dcf7500a82816f1df Mon Sep 17 00:00:00 2001 From: Alberto Soutullo Date: Mon, 15 Apr 2024 16:01:00 +0200 Subject: [PATCH] Download pod logs (#21) * Added method to download pod logs * Fixed typo in ok_value * Added kubeconfig in yaml file * Added missing comment --- scrape.yaml | 2 ++ src/metrics/kubernetes.py | 45 ++++++++++++++++++++++++++++++++++++--- src/metrics/scrapper.py | 5 ++--- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/scrape.yaml b/scrape.yaml index 5058a8a..7182f1e 100644 --- a/scrape.yaml +++ b/scrape.yaml @@ -1,3 +1,5 @@ +general_config: + kubeconfig: "kubeconfig.yaml" scrape_config: $__rate_interval: "121s" step: "60s" diff --git a/src/metrics/kubernetes.py b/src/metrics/kubernetes.py index 0e5eba3..586969a 100644 --- a/src/metrics/kubernetes.py +++ b/src/metrics/kubernetes.py @@ -1,17 +1,23 @@ # Python Imports import socket import logging +import kubernetes +import multiprocessing from typing import List, Tuple -from kubernetes.client import CoreV1Api, V1PodList, V1Service +from kubernetes.client import V1PodList, V1Service from kubernetes.stream import portforward +# Project Imports +from src.utils import path logger = logging.getLogger(__name__) class KubernetesManager: - def __init__(self, api: CoreV1Api): - self._api = api + def __init__(self, kube_config: str): + self._kube_config = kube_config + self._kube_client = kubernetes.config.new_client_from_config(self._kube_config) + self._api = kubernetes.client.CoreV1Api(self._kube_client) def create_connection(self, address, *args, **kwargs) -> socket.socket: dns_name = self._get_dns_name(address) @@ -31,6 +37,39 @@ def create_connection(self, address, *args, **kwargs) -> socket.socket: return pf.socket(port) + @staticmethod + def download_logs_from_pod_asyncable(kube_config: str, namespace: str, pod_name: str, + location: str): + kube_client = kubernetes.config.new_client_from_config(kube_config) + api = kubernetes.client.CoreV1Api(kube_client) + + logs = api.read_namespaced_pod_log(pod_name, namespace=namespace) + + path_location_result = path.prepare_path(location + pod_name + ".log") + + if path_location_result.is_ok(): + with open(f"{path_location_result.ok_value}", "w") as log_file: + log_file.write(logs) + logger.debug(f"Logs from pod {pod_name} downloaded successfully.") + else: + logger.error( + f"Unable to download logs from pod {pod_name}. Error: {path_location_result.err}") + + def download_pod_logs(self, namespace: str, location: str): + logger.info(f"Downloading logs from namespace '{namespace}' to {location}") + pods = self._api.list_namespaced_pod(namespace) + + pool = multiprocessing.Pool() + + for pod in pods.items: + pod_name = pod.metadata.name + pool.apply_async(KubernetesManager.download_logs_from_pod_asyncable, + args=(self._kube_config, namespace, pod_name, location)) + + pool.close() + pool.join() + logger.info("Logs downloaded successfully.") + def _get_dns_name(self, address: List) -> List: dns_name = address[0] if isinstance(dns_name, bytes): diff --git a/src/metrics/scrapper.py b/src/metrics/scrapper.py index 4962a7e..c434391 100644 --- a/src/metrics/scrapper.py +++ b/src/metrics/scrapper.py @@ -3,7 +3,6 @@ import logging from typing import Dict from result import Ok, Err -from kubernetes.client import CoreV1Api # Project Imports from src.metrics import scrape_utils @@ -15,12 +14,12 @@ class Scrapper: - def __init__(self, api: CoreV1Api, url: str, query_config_file: str): + def __init__(self, kube_config: str, url: str, query_config_file: str): self._url = url self._query_config = None self._query_config_file = query_config_file self._set_query_config() - self._k8s = kubernetes.KubernetesManager(api) + self._k8s = kubernetes.KubernetesManager(kube_config) def query_and_dump_metrics(self): # https://github.com/kubernetes-client/python/blob/master/examples/pod_portforward.py