From c640ba0e30d282107c1c436f5709ebbe21d6e530 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Tue, 7 Nov 2023 22:31:21 +0330 Subject: [PATCH 01/17] Add pickle utils Add functions to save and load from root path of projet --- utils/pickle_utils.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 utils/pickle_utils.py diff --git a/utils/pickle_utils.py b/utils/pickle_utils.py new file mode 100644 index 0000000..6b38123 --- /dev/null +++ b/utils/pickle_utils.py @@ -0,0 +1,36 @@ +import pickle +import os +import pathlib +import logging +from dotenv import load_dotenv + +load_dotenv() + +log_level = os.getenv('LOG_LEVEL','info').upper() +script_path=os.path.dirname(os.path.abspath(__file__)) +project_path = str(pathlib.Path(script_path).parent.absolute()) + +logger = logging.getLogger(__name__) +logger.setLevel(log_level) + +def save_as_pickle(python_object:list|dict|tuple,file_name:str) -> None: + with open(os.path.join(project_path,file_name),'wb') as fp: + try: + pickle.dump(python_object,fp) + except Exception as e: + logger.error(f"Can't save {file_name} " + f"file as picke.\n{e}",exc_info=True) + else: + logger.debug("Save picke file successfully") + +def load_dict_pickle(file_name:str) -> dict: + if not os.path.isfile(os.path.join(project_path,file_name)): + return dict() + with open(os.path.join(project_path,file_name),'wb') as fp: + try: + python_object = pickle.load(fp) + except Exception as e: + logger.error(f"Can't load {file_name} " + f"file as picke.\n{e}",exc_info=True) + else: + logger.debug("picke file loaded successfully") \ No newline at end of file From b55bcb0cbfe8860f9dc72611c7208a65d9e746d3 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Tue, 7 Nov 2023 23:10:40 +0330 Subject: [PATCH 02/17] Add configs folder configs will be define here as dataclasses --- configs/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 configs/__init__.py diff --git a/configs/__init__.py b/configs/__init__.py new file mode 100644 index 0000000..e69de29 From c3735f3b2d2907f5f46d4ca9df596ae8822e95f0 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Tue, 7 Nov 2023 23:16:02 +0330 Subject: [PATCH 03/17] Add configs.py All configs will define here as dataclass --- configs/configs.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 configs/configs.py diff --git a/configs/configs.py b/configs/configs.py new file mode 100644 index 0000000..c469d29 --- /dev/null +++ b/configs/configs.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + +@dataclass(frozen=True) +class BackOffConfs: + max_restart : int = 7 + exponential_ratio : int = 1 + + From 4b1bcff69f83a9d9f5375226ae9bae591fc5898e Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Tue, 7 Nov 2023 23:16:41 +0330 Subject: [PATCH 04/17] Add backoff feature Using pickle file, status of failed connectors (number of restart and number of times they have been seen) recorded. Each connector/task will restart at most BackOffConfs.max_restart and gap between each restart will be increased by BackOffConfs.exponential_ratio --- functionalities/connector_restart.py | 119 ++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/functionalities/connector_restart.py b/functionalities/connector_restart.py index 45794dd..d047e24 100644 --- a/functionalities/connector_restart.py +++ b/functionalities/connector_restart.py @@ -1,10 +1,12 @@ import logging import os from dotenv import load_dotenv -from rich.logging import RichHandler +from math import log +from utils.pickle_utils import save_as_pickle,load_dict_pickle from utils.kafka_connect_utils import (get_connectors_status, restart_connector, restart_task) +from configs.configs import BackOffConfs load_dotenv() @@ -35,6 +37,39 @@ def extract_failed_tasks(connectors_status:dict) -> list: return failed_tasks +def remove_healthy_from_gaurdian_memory_connector(guardian_memory_connector:dict[str,list[int,int]], + failed_connectors:tuple[str]) -> dict: + for conn in guardian_memory_connector: + + if conn not in failed_connectors: + guardian_memory_connector.pop(conn) + logger.debug(f"{conn} removed from " + "gaurdian_memory_connector") + return guardian_memory_connector + +def remove_healthy_from_gaurdian_memory_task(guardian_memory_task:dict[str,dict[str,list[int,int]]], + failed_tasks:list[tuple[str,str]]) -> dict: + for conn in guardian_memory_task: + task_ids = list(guardian_memory_task[conn].keys()) + for task_id in task_ids: + if (conn,task_id) not in failed_tasks: + guardian_memory_task[conn].pop(task_id) + logger.debug(f"task: {task_id} of {conn} removed from " + "gaurdian_memory_task") + guardian_memory_task = dict(filter(lambda x:x[1] != {},guardian_memory_task.items())) + + return guardian_memory_task + + +def should_connector_restart(reset:int, + seen:int) -> bool: + if reset == BackOffConfs.max_restart: + return False + elif seen == 0 or log(seen,BackOffConfs.exponential_ratio).is_integer(): + return True + else: + return False + def restart_failed_connectors_and_tasks(): connectors_status = get_connectors_status() if not connectors_status: @@ -42,30 +77,72 @@ def restart_failed_connectors_and_tasks(): "connectors. Please check the logs") return None + guardian_memory_connector = load_dict_pickle("guardian_memory_connector.pickle") + guardian_memory_task = load_dict_pickle("guardian_memory_task.pickle") + failed_connectors = extract_failed_connectors(connectors_status) + guardian_memory_connector = remove_healthy_from_gaurdian_memory_connector( + guardian_memory_connector, + failed_connectors + ) for conn in failed_connectors: - logger.info(f"Restarting [b]{conn}[/b]..") - restart_status = restart_connector(conn) - if restart_status == True: - logger.info(f"[b]{conn}[/b] " - "Restarted [green]successfully[/green]") + if conn not in guardian_memory_connector: + guardian_memory_connector[conn] = [0,0] + + reset,seen = guardian_memory_connector[conn] + + if not should_connector_restart(reset,seen): + logger.debug(f"{conn} will be restart later.") + guardian_memory_connector[conn] = [reset,seen+1] + continue else: - logger.error(f"Restarting [b]{conn}[/b] " - "was [red]failed[/red]") + logger.info(f"Restarting [b]{conn}[/b]..") + restart_status = restart_connector(conn) + if restart_status == True: + logger.info(f"[b]{conn}[/b] " + "Restarted [green]successfully[/green]") + else: + logger.error(f"Restarting [b]{conn}[/b] " + "was [red]failed[/red]") + guardian_memory_connector[conn] = [reset+1,seen+1] + save_as_pickle( + guardian_memory_connector, + "guardian_memory_connector.pickle") + + failed_tasks = extract_failed_tasks(connectors_status) + guardian_memory_task = remove_healthy_from_gaurdian_memory_task( + guardian_memory_task, + failed_tasks + ) + for conn,task_id in failed_tasks: + if conn not in guardian_memory_task: + guardian_memory_task[conn] = {task_id:[0,0]} + elif task_id not in guardian_memory_task[conn]: + guardian_memory_task[conn] = {task_id:[0,0]} - failed_tasks = extract_failed_tasks(connectors_status) - for conn,task_id in failed_tasks: - logger.info(f"Restarting task [i]{task_id}[/i] of " - f"[b]{conn}[/b]..") - restart_status = restart_task(conn,task_id) - if restart_status == True: - logger.info(f"task [i]{task_id}[/i] of " - f"[b]{conn}[/b] " - "Restarted [green]successfully[/green]") - else: - logger.error(f"Restarting task [i]{task_id}[/i] of " - f"[b]{conn}[/b] " - "was [red]failed[/red]") + reset,seen = guardian_memory_task[conn][task_id] + + if not should_connector_restart(reset,seen): + logger.debug(f"taks:{task_id} of {conn} will be restart later.") + guardian_memory_task[conn][task_id] = [reset,seen+1] + continue + else: + logger.info(f"Restarting task [i]{task_id}[/i] of " + f"[b]{conn}[/b]..") + restart_status = restart_task(conn,task_id) + if restart_status == True: + logger.info(f"task [i]{task_id}[/i] of " + f"[b]{conn}[/b] " + "Restarted [green]successfully[/green]") + else: + logger.error(f"Restarting task [i]{task_id}[/i] of " + f"[b]{conn}[/b] " + "was [red]failed[/red]") + guardian_memory_task[conn][task_id] = [reset+1,seen+1] + save_as_pickle( + guardian_memory_task, + "guardian_memory_task.pickle") + if not failed_connectors and not failed_tasks: logger.info("All tasks and connectors are " "[green]healthy[/green] " From c7b4042bca97f6de182a9e01bb37f29f17f1adc3 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:04:39 +0330 Subject: [PATCH 05/17] Add return to load_picke It will return emtpy dict in case of error --- utils/pickle_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/utils/pickle_utils.py b/utils/pickle_utils.py index 6b38123..c636f47 100644 --- a/utils/pickle_utils.py +++ b/utils/pickle_utils.py @@ -31,6 +31,9 @@ def load_dict_pickle(file_name:str) -> dict: python_object = pickle.load(fp) except Exception as e: logger.error(f"Can't load {file_name} " - f"file as picke.\n{e}",exc_info=True) + f"file as picke, empty dict " + "will be returend.\n{e}",exc_info=True) + return dict() else: - logger.debug("picke file loaded successfully") \ No newline at end of file + logger.debug("picke file loaded successfully") + return python_object \ No newline at end of file From 39f6fd5cad90fc890e0f911b88aaacac9c72eb91 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:05:47 +0330 Subject: [PATCH 06/17] Get values of BackOffConf from environment --- configs/configs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/configs/configs.py b/configs/configs.py index c469d29..4245499 100644 --- a/configs/configs.py +++ b/configs/configs.py @@ -1,8 +1,9 @@ from dataclasses import dataclass +from os import getenv @dataclass(frozen=True) class BackOffConfs: - max_restart : int = 7 - exponential_ratio : int = 1 + max_restart : int = int(getenv("MAX_RESTART",7)) + exponential_ratio : int = int(getenv("EXPONENTIAL_RATIO",1)) From 7ceaacb732ee6c4de2124bb4ee7a728b7bdb8b96 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:06:50 +0330 Subject: [PATCH 07/17] Refactor connector restart Whole functionality encapsulate in ConnectorRestart in order to more readability and control over code --- functionalities/connector_restart.py | 236 +++++++++++++-------------- 1 file changed, 109 insertions(+), 127 deletions(-) diff --git a/functionalities/connector_restart.py b/functionalities/connector_restart.py index d047e24..133d9f0 100644 --- a/functionalities/connector_restart.py +++ b/functionalities/connector_restart.py @@ -15,135 +15,117 @@ logger = logging.getLogger(__name__) logger.setLevel(log_level) -def extract_failed_connectors(connectors_status:dict) -> tuple: - failed_connectors = tuple( - map( - lambda x:x[0], - filter( - lambda x:x[1]['connector'] == 'FAILED', - connectors_status.items() - ) - ) - ) - return failed_connectors - -def extract_failed_tasks(connectors_status:dict) -> list: - failed_tasks = [] - for conn in connectors_status: - tasks_stat = connectors_status[conn]['tasks'] - for task_id in tasks_stat: - if tasks_stat[task_id] == 'FAILED': - failed_tasks.append((conn,task_id)) - - return failed_tasks - -def remove_healthy_from_gaurdian_memory_connector(guardian_memory_connector:dict[str,list[int,int]], - failed_connectors:tuple[str]) -> dict: - for conn in guardian_memory_connector: - - if conn not in failed_connectors: - guardian_memory_connector.pop(conn) - logger.debug(f"{conn} removed from " - "gaurdian_memory_connector") - return guardian_memory_connector - -def remove_healthy_from_gaurdian_memory_task(guardian_memory_task:dict[str,dict[str,list[int,int]]], - failed_tasks:list[tuple[str,str]]) -> dict: - for conn in guardian_memory_task: - task_ids = list(guardian_memory_task[conn].keys()) - for task_id in task_ids: - if (conn,task_id) not in failed_tasks: - guardian_memory_task[conn].pop(task_id) - logger.debug(f"task: {task_id} of {conn} removed from " - "gaurdian_memory_task") - guardian_memory_task = dict(filter(lambda x:x[1] != {},guardian_memory_task.items())) - - return guardian_memory_task - - -def should_connector_restart(reset:int, - seen:int) -> bool: - if reset == BackOffConfs.max_restart: - return False - elif seen == 0 or log(seen,BackOffConfs.exponential_ratio).is_integer(): - return True - else: - return False +class ConnectorRestarter(): -def restart_failed_connectors_and_tasks(): - connectors_status = get_connectors_status() - if not connectors_status: - logger.critical("Can't get [b]status[/b] of " - "connectors. Please check the logs") - return None + def __init__(self) -> None: + self.guardian_memory_connector:dict[str,list[int]] = load_dict_pickle("guardian_memory_connector.pickle") + self.guardian_memory_task:dict[str,dict[int,list[int]]] = load_dict_pickle("guardian_memory_task.pickle") + + def extract_failed_connectors(self, connectors_status: dict) -> list[str]: + return [connector for connector, status in connectors_status.items() if status['connector'] == 'FAILED'] + + def extract_failed_tasks(self, connectors_status: dict) -> list[tuple[str,int]]: + failed_tasks = [] + for conn in connectors_status: + tasks_stat = connectors_status[conn]['tasks'] + for task_id in tasks_stat: + if tasks_stat[task_id] == 'FAILED': + failed_tasks.append((conn,task_id)) + return failed_tasks - guardian_memory_connector = load_dict_pickle("guardian_memory_connector.pickle") - guardian_memory_task = load_dict_pickle("guardian_memory_task.pickle") - - failed_connectors = extract_failed_connectors(connectors_status) - guardian_memory_connector = remove_healthy_from_gaurdian_memory_connector( - guardian_memory_connector, - failed_connectors - ) - for conn in failed_connectors: - if conn not in guardian_memory_connector: - guardian_memory_connector[conn] = [0,0] - - reset,seen = guardian_memory_connector[conn] - - if not should_connector_restart(reset,seen): - logger.debug(f"{conn} will be restart later.") - guardian_memory_connector[conn] = [reset,seen+1] - continue + def remove_healthy_from_guardian_memory_connector(self, failed_connectors: list[str]) -> None: + self.guardian_memory_connector:dict[str,list[int]] = {conn: status + for conn, status in self.guardian_memory_connector.items() + if conn in failed_connectors} + + def remove_healthy_from_guardian_memory_task(self, failed_tasks: list[tuple[str,int]]): + for conn in self.guardian_memory_task: + for task_id in self.guardian_memory_task[conn].keys(): + if (conn,task_id) not in failed_tasks: + self.guardian_memory_task[conn].pop(task_id) + + self.guardian_memory_task = dict( + filter(lambda x:x[1] != {},self.guardian_memory_task.items()) + ) + + def should_connector_restart(self, + reset:int, + seen:int) -> bool: + if reset == BackOffConfs.max_restart: + return False + elif seen == 0 or log(seen,BackOffConfs.exponential_ratio).is_integer(): + return True else: - logger.info(f"Restarting [b]{conn}[/b]..") - restart_status = restart_connector(conn) - if restart_status == True: - logger.info(f"[b]{conn}[/b] " - "Restarted [green]successfully[/green]") - else: - logger.error(f"Restarting [b]{conn}[/b] " - "was [red]failed[/red]") - guardian_memory_connector[conn] = [reset+1,seen+1] - save_as_pickle( - guardian_memory_connector, - "guardian_memory_connector.pickle") - - failed_tasks = extract_failed_tasks(connectors_status) - guardian_memory_task = remove_healthy_from_gaurdian_memory_task( - guardian_memory_task, - failed_tasks + return False + + def restart_connector(self, connector:str) -> None: + reset, seen = self.guardian_memory_connector.get(connector, [0, 0]) + + if not self.should_connector_restart(reset, seen): + logger.debug(f"{connector} will be restarted later.") + self.guardian_memory_connector[connector] = [reset, seen + 1] + return + + restart_status = restart_connector(connector) + if restart_status: + logger.info(f"[b]{connector}[/b] " + "Restarted {reset} out of " + f"{BackOffConfs.max_restart} times " + "[green]successfully[/green]") + else: + logger.error(f"Restarting [b]{connector}[/b] " + "was [red]failed[/red]") + + self.guardian_memory_connector[connector] = [reset + 1, seen + 1] + save_as_pickle(self.guardian_memory_connector, "guardian_memory_connector.pickle") + + def restart_task(self, connector:str, task_id:int) -> None: + reset, seen = self.guardian_memory_task.get(connector,{}).get(task_id,[0,0]) + + if not self.should_connector_restart(reset, seen): + logger.debug(f"Task {task_id} of [b]{connector}[/b] " + "will be restarted later.") + self.guardian_memory_task.update( + {connector:{task_id:[reset, seen + 1]}} + ) + return + + restart_status = restart_task(connector, task_id) + if restart_status: + logger.info(f"task [i]{task_id}[/i] of " + f"[b]{connector}[/b] " + f"Restarted {reset} out of " + f"{BackOffConfs.max_restart} times " + "[green]successfully[/green]") + else: + logger.error(f"Restarting task [i]{task_id}[/i] of " + f"[b]{connector}[/b] " + "was [red]failed[/red]") + self.guardian_memory_task.update( + {connector:{task_id:[reset + 1, seen + 1]}} ) + save_as_pickle(self.guardian_memory_task, "guardian_memory_task.pickle") + + def restart_failed_connectors_and_tasks(self) -> None: + connectors_status = get_connectors_status() + if not connectors_status: + logger.critical("Can't get [b]status[/b] of " + "connectors. Please check the logs") + return + + failed_connectors = self.extract_failed_connectors(connectors_status) + self.remove_healthy_from_guardian_memory_connector(failed_connectors) + + for conn in failed_connectors: + self.restart_connector(conn) + + failed_tasks = self.extract_failed_tasks(connectors_status) + self.remove_healthy_from_guardian_memory_task(failed_tasks) + for conn,task_id in failed_tasks: - if conn not in guardian_memory_task: - guardian_memory_task[conn] = {task_id:[0,0]} - elif task_id not in guardian_memory_task[conn]: - guardian_memory_task[conn] = {task_id:[0,0]} - - reset,seen = guardian_memory_task[conn][task_id] - - if not should_connector_restart(reset,seen): - logger.debug(f"taks:{task_id} of {conn} will be restart later.") - guardian_memory_task[conn][task_id] = [reset,seen+1] - continue - else: - logger.info(f"Restarting task [i]{task_id}[/i] of " - f"[b]{conn}[/b]..") - restart_status = restart_task(conn,task_id) - if restart_status == True: - logger.info(f"task [i]{task_id}[/i] of " - f"[b]{conn}[/b] " - "Restarted [green]successfully[/green]") - else: - logger.error(f"Restarting task [i]{task_id}[/i] of " - f"[b]{conn}[/b] " - "was [red]failed[/red]") - guardian_memory_task[conn][task_id] = [reset+1,seen+1] - save_as_pickle( - guardian_memory_task, - "guardian_memory_task.pickle") - - if not failed_connectors and not failed_tasks: - logger.info("All tasks and connectors are " - "[green]healthy[/green] " - "[yellow]:)[/yellow]") \ No newline at end of file + self.restart_task(conn, task_id) + + if not failed_connectors and not failed_tasks: + logger.info("All tasks and connectors are " + "[green]healthy[/green] " + "[yellow]:)[/yellow]") From 7ea91f8a85d302b17ae5f92283f57b102a7d1980 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:07:57 +0330 Subject: [PATCH 08/17] Add BackOff variables with default values --- .env.sample | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.env.sample b/.env.sample index bf661f9..38cf5fb 100644 --- a/.env.sample +++ b/.env.sample @@ -6,4 +6,7 @@ KAFKA_CONNECT_PORT="8083" KAFKA_CONNECT_USER="" KAFKA_CONNECT_PASS="" +MAX_RESTART=7 +EXPONENTIAL_RATIO=1 + From 563aae29ea984e4538ec1e9d2716d6140014a560 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:09:01 +0330 Subject: [PATCH 09/17] Add BackOff variables with default values --- deploy/docker-compose.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/deploy/docker-compose.yaml b/deploy/docker-compose.yaml index ac0980e..ead69c8 100644 --- a/deploy/docker-compose.yaml +++ b/deploy/docker-compose.yaml @@ -12,3 +12,5 @@ services: - KAFKA_CONNECT_PORT=8083 - KAFKA_CONNECT_PROTOCOL=http - LOG_LEVEL=info + - MAX_RESTART=7 + - EXPONENTIAL_RATIO=1 From bd1862466d42b8bcbf4b7497c219ef5090cb4c4f Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:09:37 +0330 Subject: [PATCH 10/17] Refactor values.yml Reorganize values in order to more readability and easier useage --- deploy/chart/values.yaml | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/deploy/chart/values.yaml b/deploy/chart/values.yaml index 1b5a5d2..e044620 100644 --- a/deploy/chart/values.yaml +++ b/deploy/chart/values.yaml @@ -1,24 +1,22 @@ -guard: +gaurdianSpec: image: docker.io/anvaari/connector-guardian:0.2.0 -spec: replicas: 1 + resources: + limits: + cpu: 50m + memory: 100Mi + requests: + cpu: 20m + memory: 20Mi -containers: - resources: - limits: - cpu: 50m - memory: 100Mi - requests: - cpu: 20m - memory: 20Mi - env: - - name: KAFKA_CONNECT_HOST - value: 'localhost' - - name: KAFKA_CONNECT_USER - value: '' - - name: KAFKA_CONNECT_PASS - value: '' - - name: KAFKA_CONNECT_PORT - value: '8083' - - name: KAFKA_CONNECT_PROTOCOL - value: 'http' +guardianConfig: + env: + name: guardian-configs + data: + KAFKA_CONNECT_HOST: localhost + KAFKA_CONNECT_USER: '' + KAFKA_CONNECT_PASS: '' + KAFKA_CONNECT_PORT: '8083' + KAFKA_CONNECT_PROTOCOL: 'http' + MAX_RESTART: '7' + EXPONENTIAL_RATIO: '1' \ No newline at end of file From 4cef7826508fd3b3bb7dac4064cf242eef77c7a0 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:11:15 +0330 Subject: [PATCH 11/17] Add configMap It will replace with direct environment set --- deploy/chart/templates/config-map.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 deploy/chart/templates/config-map.yml diff --git a/deploy/chart/templates/config-map.yml b/deploy/chart/templates/config-map.yml new file mode 100644 index 0000000..92c38e6 --- /dev/null +++ b/deploy/chart/templates/config-map.yml @@ -0,0 +1,11 @@ +{{- with .Values.guardianConfig.env }} +{{- if .}} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .name }} +data: + {{- toYaml .data | nindent 2 }} +{{- end }} +{{- end }} + From a1952f8d602c24f854a4d6de6909cb1933c9d9de Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:19:12 +0330 Subject: [PATCH 12/17] Change value's path regarding to change in values file Also Change direct environment set to read from configMap --- deploy/chart/templates/deployment.yaml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/deploy/chart/templates/deployment.yaml b/deploy/chart/templates/deployment.yaml index b7d8168..fe32cf8 100644 --- a/deploy/chart/templates/deployment.yaml +++ b/deploy/chart/templates/deployment.yaml @@ -3,7 +3,7 @@ kind: Deployment metadata: name: connector-guardian spec: - replicas: {.Values.spec.replicas} + replicas: {{ .Values.gaurdianSpec.replicas }} selector: matchLabels: app: connector-guardian @@ -14,10 +14,12 @@ spec: spec: containers: - name: connector-guardian - image: {{ .Values.guard.image }} + image: {{ .Values.gaurdianSpec.image }} command: ["/bin/sh"] args: ["-c", "while true; do python /connector-guardian/connector_guardian.py; sleep 300; done"] resources: - {{- toYaml ".Values.containers.resources" | indent 4 }} - env: - {{- toYaml ".Values.containers.env" | indent 4 }} + {{- toYaml .Values.gaurdianSpec.resources | nindent 10 }} + envFrom: + - configMapRef: + name: {{ .Values.guardianConfig.env.name }} + From 100f6da11a5f418f932ea5f2c3c8950f5aa4186e Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Wed, 8 Nov 2023 01:20:21 +0330 Subject: [PATCH 13/17] Use ConnectorRestarter class instead of old function --- connector_guardian.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector_guardian.py b/connector_guardian.py index eb23a86..c65a0fc 100644 --- a/connector_guardian.py +++ b/connector_guardian.py @@ -2,7 +2,7 @@ from dotenv import load_dotenv import os -from functionalities.connector_restart import restart_failed_connectors_and_tasks +from functionalities.connector_restart import ConnectorRestarter from utils.rich_utils import MyRichLogHandler load_dotenv() @@ -19,4 +19,4 @@ logging.info("Start [b green]Restarting[/b green] failed connectors") -restart_failed_connectors_and_tasks() \ No newline at end of file +ConnectorRestarter().restart_failed_connectors_and_tasks() \ No newline at end of file From 4e03304213ac66b59b99b2c077ee277a5e354c12 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Thu, 9 Nov 2023 23:56:22 +0330 Subject: [PATCH 14/17] Update README for V0.3.0 --- README.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 30b9698..caa3022 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ Connector's Guardian interact with Kafka Connect cluster using its [rest api](ht * **Auto Connector Restart**: It check status of connectors and tasks and restart if they are failed. **Available from [V0.1.0](https://github.com/anvaari/connector-guardian/releases/tag/0.1.0)** +* **Restart Back Off**: The restarts will happen always after an increasing time period. The first restart will happen immediately at first run. If it does not help, the next restart will happen only after `EXPONENTIAL_RATIO ^ 1` run. If even the second restart doesn’t help, the next restart will be done only after another `EXPONENTIAL_RATIO ^ 1` run. And so on. This leaves more time for the root cause of the failure to be resolved. Thanks to this back-off mechanism, even if the network outage takes over these minutes, the auto-restart will help your connectors to recover from it. The last Restart (= `MAX_RESTART`) restart will happen after `EXPONENTIAL_RATIO ^ 1` run from the initial failure. But if the issue is not resolved even after the last restart, the Guardian will stop restarting and it is up to you to solve it manually. + ## Usage ### Container image @@ -43,7 +45,7 @@ After deploying, it creates 1 pod which run a `connector_guardian.py` every 5 mi ### Environment variables -In order to use Docker image you need to set some environment variables: +In order to use Docker image, [docker-compose](./deploy/docker-compose.yaml) or [helm chart](./deploy/chart/), you need to set some environment variables: * `KAFKA_CONNECT_HOST`: Default = `localhost` * Host of your kafka connect cluster (without `http` or `https` and any `/` at the end and also port) @@ -53,6 +55,10 @@ In order to use Docker image you need to set some environment variables: * Protocol for kafka connect host. Should be `http` and `https` * `KAFKA_CONNECT_USER`: Default = `''` * `KAFKA_CONNECT_PASS`: Default = `''` +* `MAX_RESTART` : Default = `7` + * Maximum number of continuouse restart for each connector +* `EXPONENTIAL_RATIO`: Default = `1` + * Exponential ratio to increase sleep between each connector restart. **Note:** Set values for `KAFKA_CONNECT_USER` and `KAFKA_CONNECT_PASS` only if Kafka Connect cluster need basic authentication otherwise don't set them. @@ -68,3 +74,7 @@ First version of connector guardian which use simple bash script which restart f * Add helm chart thanks to [Amin](https://github.com/alashti) * Add `docker-compose.yaml` so connector guardian can be used for non-cloud environment * `KAFKA_CONNECT_PROTO` changed to `KAFKA_CONNECT_PROTOCOL` + +### [0.3.0](https://github.com/anvaari/connector-guardian/releases/tag/0.3.0) +* Add restart back off mechanism. Read more [here](#features) +* Some enhancement on helm chart \ No newline at end of file From 683052cd5a8dee5de0528729a7396a7d5fb08862 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Fri, 10 Nov 2023 00:24:06 +0330 Subject: [PATCH 15/17] Add is_enabled to BackOffConfs --- configs/configs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/configs/configs.py b/configs/configs.py index 4245499..f66f9aa 100644 --- a/configs/configs.py +++ b/configs/configs.py @@ -5,5 +5,6 @@ class BackOffConfs: max_restart : int = int(getenv("MAX_RESTART",7)) exponential_ratio : int = int(getenv("EXPONENTIAL_RATIO",1)) + is_enabled : bool = bool(int(getenv("ENABLE_BACKOFF",1))) From 91d20e158d9abfc59aea2ce6b306e7664eaf9143 Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Fri, 10 Nov 2023 00:35:38 +0330 Subject: [PATCH 16/17] Add is_enabled to connector restart Connectors will always restart --- functionalities/connector_restart.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/functionalities/connector_restart.py b/functionalities/connector_restart.py index 133d9f0..7e8524a 100644 --- a/functionalities/connector_restart.py +++ b/functionalities/connector_restart.py @@ -51,6 +51,8 @@ def remove_healthy_from_guardian_memory_task(self, failed_tasks: list[tuple[str, def should_connector_restart(self, reset:int, seen:int) -> bool: + if not BackOffConfs.is_enabled : + return True if reset == BackOffConfs.max_restart: return False elif seen == 0 or log(seen,BackOffConfs.exponential_ratio).is_integer(): @@ -77,7 +79,8 @@ def restart_connector(self, connector:str) -> None: "was [red]failed[/red]") self.guardian_memory_connector[connector] = [reset + 1, seen + 1] - save_as_pickle(self.guardian_memory_connector, "guardian_memory_connector.pickle") + if BackOffConfs.is_enabled: + save_as_pickle(self.guardian_memory_connector, "guardian_memory_connector.pickle") def restart_task(self, connector:str, task_id:int) -> None: reset, seen = self.guardian_memory_task.get(connector,{}).get(task_id,[0,0]) @@ -104,7 +107,8 @@ def restart_task(self, connector:str, task_id:int) -> None: self.guardian_memory_task.update( {connector:{task_id:[reset + 1, seen + 1]}} ) - save_as_pickle(self.guardian_memory_task, "guardian_memory_task.pickle") + if BackOffConfs.is_enabled: + save_as_pickle(self.guardian_memory_task, "guardian_memory_task.pickle") def restart_failed_connectors_and_tasks(self) -> None: connectors_status = get_connectors_status() From 9614f074ad0bc21e2c58bc6be0d510ed3edffabe Mon Sep 17 00:00:00 2001 From: Mohammad Anvaari Date: Fri, 10 Nov 2023 00:36:21 +0330 Subject: [PATCH 17/17] Add description for ENABLE_BACKOFF --- README.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index caa3022..c2b2fc3 100644 --- a/README.md +++ b/README.md @@ -55,8 +55,11 @@ In order to use Docker image, [docker-compose](./deploy/docker-compose.yaml) or * Protocol for kafka connect host. Should be `http` and `https` * `KAFKA_CONNECT_USER`: Default = `''` * `KAFKA_CONNECT_PASS`: Default = `''` +* `ENABLE_BACKOFF`: Default = `1` + * Whether restart back off mechanism should be enabled of not + * You should specify this with `0` or `1` every other value considered as `1` * `MAX_RESTART` : Default = `7` - * Maximum number of continuouse restart for each connector + * Maximum number of continuous restart for each connector * `EXPONENTIAL_RATIO`: Default = `1` * Exponential ratio to increase sleep between each connector restart. @@ -76,5 +79,7 @@ First version of connector guardian which use simple bash script which restart f * `KAFKA_CONNECT_PROTO` changed to `KAFKA_CONNECT_PROTOCOL` ### [0.3.0](https://github.com/anvaari/connector-guardian/releases/tag/0.3.0) + * Add restart back off mechanism. Read more [here](#features) -* Some enhancement on helm chart \ No newline at end of file +* Some enhancement on helm chart +* Solve [#11](https://github.com/anvaari/connector-guardian/issues/11) and [#12](https://github.com/anvaari/connector-guardian/issues/12)