Skip to content

Commit

Permalink
Merge pull request #14 from anvaari/add-restart-backoff
Browse files Browse the repository at this point in the history
Add restart back off
  • Loading branch information
anvaari authored Nov 9, 2023
2 parents a9c3e51 + 9614f07 commit 701789b
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 81 deletions.
3 changes: 3 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ KAFKA_CONNECT_PORT="8083"
KAFKA_CONNECT_USER=""
KAFKA_CONNECT_PASS=""

MAX_RESTART=7
EXPONENTIAL_RATIO=1


17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Connector's Guardian interact with Kafka Connect cluster using its [rest api](ht

* **Auto Connector Restart**: It check status of connectors and tasks and restart if they are failed. **Available from [V0.1.0](https://github.com/anvaari/connector-guardian/releases/tag/0.1.0)**

* **Restart Back Off**: The restarts will happen always after an increasing time period. The first restart will happen immediately at first run. If it does not help, the next restart will happen only after `EXPONENTIAL_RATIO ^ 1` run. If even the second restart doesn’t help, the next restart will be done only after another `EXPONENTIAL_RATIO ^ 1` run. And so on. This leaves more time for the root cause of the failure to be resolved. Thanks to this back-off mechanism, even if the network outage takes over these minutes, the auto-restart will help your connectors to recover from it. The last Restart (= `MAX_RESTART`) restart will happen after `EXPONENTIAL_RATIO ^ 1` run from the initial failure. But if the issue is not resolved even after the last restart, the Guardian will stop restarting and it is up to you to solve it manually.

## Usage

### Container image
Expand Down Expand Up @@ -43,7 +45,7 @@ After deploying, it creates 1 pod which run a `connector_guardian.py` every 5 mi

### Environment variables

In order to use Docker image you need to set some environment variables:
In order to use Docker image, [docker-compose](./deploy/docker-compose.yaml) or [helm chart](./deploy/chart/), you need to set some environment variables:

* `KAFKA_CONNECT_HOST`: Default = `localhost`
* Host of your kafka connect cluster (without `http` or `https` and any `/` at the end and also port)
Expand All @@ -53,6 +55,13 @@ In order to use Docker image you need to set some environment variables:
* Protocol for kafka connect host. Should be `http` and `https`
* `KAFKA_CONNECT_USER`: Default = `''`
* `KAFKA_CONNECT_PASS`: Default = `''`
* `ENABLE_BACKOFF`: Default = `1`
* Whether restart back off mechanism should be enabled of not
* You should specify this with `0` or `1` every other value considered as `1`
* `MAX_RESTART` : Default = `7`
* Maximum number of continuous restart for each connector
* `EXPONENTIAL_RATIO`: Default = `1`
* Exponential ratio to increase sleep between each connector restart.

**Note:** Set values for `KAFKA_CONNECT_USER` and `KAFKA_CONNECT_PASS` only if Kafka Connect cluster need basic authentication otherwise don't set them.

Expand All @@ -68,3 +77,9 @@ First version of connector guardian which use simple bash script which restart f
* Add helm chart thanks to [Amin](https://github.com/alashti)
* Add `docker-compose.yaml` so connector guardian can be used for non-cloud environment
* `KAFKA_CONNECT_PROTO` changed to `KAFKA_CONNECT_PROTOCOL`

### [0.3.0](https://github.com/anvaari/connector-guardian/releases/tag/0.3.0)

* Add restart back off mechanism. Read more [here](#features)
* Some enhancement on helm chart
* Solve [#11](https://github.com/anvaari/connector-guardian/issues/11) and [#12](https://github.com/anvaari/connector-guardian/issues/12)
Empty file added configs/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions configs/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass
from os import getenv

@dataclass(frozen=True)
class BackOffConfs:
max_restart : int = int(getenv("MAX_RESTART",7))
exponential_ratio : int = int(getenv("EXPONENTIAL_RATIO",1))
is_enabled : bool = bool(int(getenv("ENABLE_BACKOFF",1)))


4 changes: 2 additions & 2 deletions connector_guardian.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dotenv import load_dotenv
import os

from functionalities.connector_restart import restart_failed_connectors_and_tasks
from functionalities.connector_restart import ConnectorRestarter
from utils.rich_utils import MyRichLogHandler

load_dotenv()
Expand All @@ -19,4 +19,4 @@

logging.info("Start [b green]Restarting[/b green] failed connectors")

restart_failed_connectors_and_tasks()
ConnectorRestarter().restart_failed_connectors_and_tasks()
11 changes: 11 additions & 0 deletions deploy/chart/templates/config-map.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{{- with .Values.guardianConfig.env }}
{{- if .}}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .name }}
data:
{{- toYaml .data | nindent 2 }}
{{- end }}
{{- end }}

12 changes: 7 additions & 5 deletions deploy/chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Deployment
metadata:
name: connector-guardian
spec:
replicas: {.Values.spec.replicas}
replicas: {{ .Values.gaurdianSpec.replicas }}
selector:
matchLabels:
app: connector-guardian
Expand All @@ -14,10 +14,12 @@ spec:
spec:
containers:
- name: connector-guardian
image: {{ .Values.guard.image }}
image: {{ .Values.gaurdianSpec.image }}
command: ["/bin/sh"]
args: ["-c", "while true; do python /connector-guardian/connector_guardian.py; sleep 300; done"]
resources:
{{- toYaml ".Values.containers.resources" | indent 4 }}
env:
{{- toYaml ".Values.containers.env" | indent 4 }}
{{- toYaml .Values.gaurdianSpec.resources | nindent 10 }}
envFrom:
- configMapRef:
name: {{ .Values.guardianConfig.env.name }}

40 changes: 19 additions & 21 deletions deploy/chart/values.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
guard:
gaurdianSpec:
image: docker.io/anvaari/connector-guardian:0.2.0
spec:
replicas: 1
resources:
limits:
cpu: 50m
memory: 100Mi
requests:
cpu: 20m
memory: 20Mi

containers:
resources:
limits:
cpu: 50m
memory: 100Mi
requests:
cpu: 20m
memory: 20Mi
env:
- name: KAFKA_CONNECT_HOST
value: 'localhost'
- name: KAFKA_CONNECT_USER
value: ''
- name: KAFKA_CONNECT_PASS
value: ''
- name: KAFKA_CONNECT_PORT
value: '8083'
- name: KAFKA_CONNECT_PROTOCOL
value: 'http'
guardianConfig:
env:
name: guardian-configs
data:
KAFKA_CONNECT_HOST: localhost
KAFKA_CONNECT_USER: ''
KAFKA_CONNECT_PASS: ''
KAFKA_CONNECT_PORT: '8083'
KAFKA_CONNECT_PROTOCOL: 'http'
MAX_RESTART: '7'
EXPONENTIAL_RATIO: '1'
2 changes: 2 additions & 0 deletions deploy/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ services:
- KAFKA_CONNECT_PORT=8083
- KAFKA_CONNECT_PROTOCOL=http
- LOG_LEVEL=info
- MAX_RESTART=7
- EXPONENTIAL_RATIO=1
167 changes: 115 additions & 52 deletions functionalities/connector_restart.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import os
from dotenv import load_dotenv
from rich.logging import RichHandler
from math import log
from utils.pickle_utils import save_as_pickle,load_dict_pickle
from utils.kafka_connect_utils import (get_connectors_status,
restart_connector,
restart_task)
from configs.configs import BackOffConfs

load_dotenv()

Expand All @@ -13,60 +15,121 @@
logger = logging.getLogger(__name__)
logger.setLevel(log_level)

def extract_failed_connectors(connectors_status:dict) -> tuple:
failed_connectors = tuple(
map(
lambda x:x[0],
filter(
lambda x:x[1]['connector'] == 'FAILED',
connectors_status.items()
)
class ConnectorRestarter():

def __init__(self) -> None:
self.guardian_memory_connector:dict[str,list[int]] = load_dict_pickle("guardian_memory_connector.pickle")
self.guardian_memory_task:dict[str,dict[int,list[int]]] = load_dict_pickle("guardian_memory_task.pickle")

def extract_failed_connectors(self, connectors_status: dict) -> list[str]:
return [connector for connector, status in connectors_status.items() if status['connector'] == 'FAILED']

def extract_failed_tasks(self, connectors_status: dict) -> list[tuple[str,int]]:
failed_tasks = []
for conn in connectors_status:
tasks_stat = connectors_status[conn]['tasks']
for task_id in tasks_stat:
if tasks_stat[task_id] == 'FAILED':
failed_tasks.append((conn,task_id))
return failed_tasks

def remove_healthy_from_guardian_memory_connector(self, failed_connectors: list[str]) -> None:
self.guardian_memory_connector:dict[str,list[int]] = {conn: status
for conn, status in self.guardian_memory_connector.items()
if conn in failed_connectors}

def remove_healthy_from_guardian_memory_task(self, failed_tasks: list[tuple[str,int]]):
for conn in self.guardian_memory_task:
for task_id in self.guardian_memory_task[conn].keys():
if (conn,task_id) not in failed_tasks:
self.guardian_memory_task[conn].pop(task_id)

self.guardian_memory_task = dict(
filter(lambda x:x[1] != {},self.guardian_memory_task.items())
)
)
return failed_connectors

def extract_failed_tasks(connectors_status:dict) -> list:
failed_tasks = []
for conn in connectors_status:
tasks_stat = connectors_status[conn]['tasks']
for task_id in tasks_stat:
if tasks_stat[task_id] == 'FAILED':
failed_tasks.append((conn,task_id))

return failed_tasks

def restart_failed_connectors_and_tasks():
connectors_status = get_connectors_status()
if not connectors_status:
logger.critical("Can't get [b]status[/b] of "
"connectors. Please check the logs")
return None

def should_connector_restart(self,
reset:int,
seen:int) -> bool:
if not BackOffConfs.is_enabled :
return True
if reset == BackOffConfs.max_restart:
return False
elif seen == 0 or log(seen,BackOffConfs.exponential_ratio).is_integer():
return True
else:
return False

failed_connectors = extract_failed_connectors(connectors_status)
for conn in failed_connectors:
logger.info(f"Restarting [b]{conn}[/b]..")
restart_status = restart_connector(conn)
if restart_status == True:
logger.info(f"[b]{conn}[/b] "
"Restarted [green]successfully[/green]")
def restart_connector(self, connector:str) -> None:
reset, seen = self.guardian_memory_connector.get(connector, [0, 0])

if not self.should_connector_restart(reset, seen):
logger.debug(f"{connector} will be restarted later.")
self.guardian_memory_connector[connector] = [reset, seen + 1]
return

restart_status = restart_connector(connector)
if restart_status:
logger.info(f"[b]{connector}[/b] "
"Restarted {reset} out of "
f"{BackOffConfs.max_restart} times "
"[green]successfully[/green]")
else:
logger.error(f"Restarting [b]{conn}[/b] "
"was [red]failed[/red]")

failed_tasks = extract_failed_tasks(connectors_status)
for conn,task_id in failed_tasks:
logger.info(f"Restarting task [i]{task_id}[/i] of "
f"[b]{conn}[/b]..")
restart_status = restart_task(conn,task_id)
if restart_status == True:
logger.error(f"Restarting [b]{connector}[/b] "
"was [red]failed[/red]")

self.guardian_memory_connector[connector] = [reset + 1, seen + 1]
if BackOffConfs.is_enabled:
save_as_pickle(self.guardian_memory_connector, "guardian_memory_connector.pickle")

def restart_task(self, connector:str, task_id:int) -> None:
reset, seen = self.guardian_memory_task.get(connector,{}).get(task_id,[0,0])

if not self.should_connector_restart(reset, seen):
logger.debug(f"Task {task_id} of [b]{connector}[/b] "
"will be restarted later.")
self.guardian_memory_task.update(
{connector:{task_id:[reset, seen + 1]}}
)
return

restart_status = restart_task(connector, task_id)
if restart_status:
logger.info(f"task [i]{task_id}[/i] of "
f"[b]{conn}[/b] "
"Restarted [green]successfully[/green]")
f"[b]{connector}[/b] "
f"Restarted {reset} out of "
f"{BackOffConfs.max_restart} times "
"[green]successfully[/green]")
else:
logger.error(f"Restarting task [i]{task_id}[/i] of "
f"[b]{conn}[/b] "
"was [red]failed[/red]")
if not failed_connectors and not failed_tasks:
logger.info("All tasks and connectors are "
"[green]healthy[/green] "
"[yellow]:)[/yellow]")
f"[b]{connector}[/b] "
"was [red]failed[/red]")
self.guardian_memory_task.update(
{connector:{task_id:[reset + 1, seen + 1]}}
)
if BackOffConfs.is_enabled:
save_as_pickle(self.guardian_memory_task, "guardian_memory_task.pickle")

def restart_failed_connectors_and_tasks(self) -> None:
connectors_status = get_connectors_status()
if not connectors_status:
logger.critical("Can't get [b]status[/b] of "
"connectors. Please check the logs")
return

failed_connectors = self.extract_failed_connectors(connectors_status)
self.remove_healthy_from_guardian_memory_connector(failed_connectors)

for conn in failed_connectors:
self.restart_connector(conn)

failed_tasks = self.extract_failed_tasks(connectors_status)
self.remove_healthy_from_guardian_memory_task(failed_tasks)

for conn,task_id in failed_tasks:
self.restart_task(conn, task_id)

if not failed_connectors and not failed_tasks:
logger.info("All tasks and connectors are "
"[green]healthy[/green] "
"[yellow]:)[/yellow]")
39 changes: 39 additions & 0 deletions utils/pickle_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pickle
import os
import pathlib
import logging
from dotenv import load_dotenv

load_dotenv()

log_level = os.getenv('LOG_LEVEL','info').upper()
script_path=os.path.dirname(os.path.abspath(__file__))
project_path = str(pathlib.Path(script_path).parent.absolute())

logger = logging.getLogger(__name__)
logger.setLevel(log_level)

def save_as_pickle(python_object:list|dict|tuple,file_name:str) -> None:
with open(os.path.join(project_path,file_name),'wb') as fp:
try:
pickle.dump(python_object,fp)
except Exception as e:
logger.error(f"Can't save {file_name} "
f"file as picke.\n{e}",exc_info=True)
else:
logger.debug("Save picke file successfully")

def load_dict_pickle(file_name:str) -> dict:
if not os.path.isfile(os.path.join(project_path,file_name)):
return dict()
with open(os.path.join(project_path,file_name),'wb') as fp:
try:
python_object = pickle.load(fp)
except Exception as e:
logger.error(f"Can't load {file_name} "
f"file as picke, empty dict "
"will be returend.\n{e}",exc_info=True)
return dict()
else:
logger.debug("picke file loaded successfully")
return python_object

0 comments on commit 701789b

Please sign in to comment.