Skip to content

Commit

Permalink
Merge pull request #9 from anvaari/migrate-to-python
Browse files Browse the repository at this point in the history
Migrate to python
  • Loading branch information
anvaari authored Sep 23, 2023
2 parents b3e3657 + f17a62d commit d635f7b
Show file tree
Hide file tree
Showing 20 changed files with 304 additions and 153 deletions.
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
./github
./gitignore
./deploy
.env*
./venv
./.git
9 changes: 9 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
LOG_LEVEL="INFO"

KAFKA_CONNECT_HOST="localhost"
KAFKA_CONNECT_PROTOCOL="http"
KAFKA_CONNECT_PORT="8083"
KAFKA_CONNECT_USER=""
KAFKA_CONNECT_PASS=""


5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
.vscode
.vscode
venv
.env
__pycache__
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
ARG ALPINE_VERSION=3.16
ARG PYTHON_VERSION=3.9

FROM alpine:${ALPINE_VERSION}
FROM python:${PYTHON_VERSION}-alpine

LABEL maintainer="[email protected]"

RUN apk update && \
apk add --update --no-cache \
bash curl util-linux jq
COPY . /connector-guardian

RUN pip install -U pip && \
pip install -r /connector-guardian/requirements.txt

COPY ./connector-restart /connector-restart
49 changes: 36 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,47 +1,70 @@
# Connector's Guardian

Guardian you need for your Kafka Connect connectors. It check status of connectors and tasks and restart if they are failed.
Guardian you need for your Kafka Connect connectors.

## How It work

Connector's Guardian interact with Kafka Connect cluster using its [rest api](https://docs.confluent.io/platform/current/connect/references/restapi.html) and parse returned json with [jq](https://github.com/jqlang/jq).
Connector's Guardian interact with Kafka Connect cluster using its [rest api](https://docs.confluent.io/platform/current/connect/references/restapi.html) and parse returned json with [jq](https://github.com/jqlang/jq) (in version [0.1.0](https://github.com/anvaari/connector-guardian/releases/tag/0.1.0)) and with json library in python (from version[0.2.0](https://github.com/anvaari/connector-guardian/releases/tag/0.2.0)).

## Container image
## Features

* **Auto Connector Restart**: It check status of connectors and tasks and restart if they are failed. **Available from [V0.1.0](https://github.com/anvaari/connector-guardian/releases/tag/0.1.0)

## Usage

### Container image

You can pull image from Docker hub:

* [https://hub.docker.com/r/anvaari/connector-guardian](https://hub.docker.com/r/anvaari/connector-guardian)

## Usage
### Non Cloud Environments

The image is optimized to use in k8s/okd4 environments. You can simply deploy provided [deployment.yaml](./deploy/deployment.yaml) with `kubectl` (on k8s) or `oc` (on okd):
You can use provided [docker-compose](./deploy/docker-compose.yaml) to deploy connector guardian in your server

**Important:** Change [environment](#environment-variables) variables to match your Kafka Connect cluster before deploying.
Before deploying image, make sure to set appropriate environment variables in [docker-compose.yaml](./deploy/docker-compose.yaml)

```bash
oc apply -f deploy/deployment.yaml -n {your_namespace_name}
cd deploy
docker compose up -d
```

### Kubernetes or Open Shift

You can use provided helm chart: (You can see guid for install helm [here](https://helm.sh/docs/intro/install/))

Before deploying chart, make sure to set appropriate environment variables in [values.yaml](./deploy/chart/values.yaml)

```bash
kubectl apply -f deploy/deployment.yaml -n {your_namespace_name}
helm upgrade connector-guardian --install -n {your_namespace_name} -f deploy/chart/values.yaml deploy/chart
```

After deploying, it creates 1 pod which run a bash script every 5 minutes. So all failed connectors and tasks will restart.

**Note:** It ignore `PAUSED` connector so it don't restart failed task of `PAUSED` connectors.
After deploying, it creates 1 pod which run a `connector_guardian.py` every 5 minutes.

### Environment variables

In order to use Docker image you need to set some environment variables:

* `KAFKA_CONNECT_HOST`: Default = `localhost`
* Host of your kafka connect cluster (without `http` or `https` and any `/` at the end and also port)
* `KAFKA_CONNECT_PORT`: Default = `8083`
* Port of kafka connect cluster for rest api
* `KAFKA_CONNECT_PROTO`: Default = `http`
* Protocol for kafka connect host. Should be `http` and `https`
* `KAFKA_CONNECT_USER`: Default = `''`
* `KAFKA_CONNECT_PASS`: Default = `''`

**Note:** Set values for `KAFKA_CONNECT_USER` and `KAFKA_CONNECT_PASS` only if Kafka Connect cluster need basic authentication otherwise don't set them.

## To Do
## Change Log

### [0.1.0](https://github.com/anvaari/connector-guardian/releases/tag/0.1.0)

First version of connector guardian which use simple bash script which restart failed connector and task in each run

### [0.2.0](https://github.com/anvaari/connector-guardian/releases/tag/0.2.0)

* Make Docker Image usable in non ks8 environments.
* Migrate to python
* Add helm chart thanks to [Amin](https://github.com/alashti)
* Add `docker-compose.yaml` so connector guardian can be used for non-cloud environment
* `KAFKA_CONNECT_PROTO` changed to `KAFKA_CONNECT_PROTOCOL`
131 changes: 0 additions & 131 deletions connector-restart

This file was deleted.

22 changes: 22 additions & 0 deletions connector_guardian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
from dotenv import load_dotenv
import os
from rich.logging import RichHandler
from functionalities.connector_restart import restart_failed_connectors_and_tasks

load_dotenv()

log_level = os.getenv("LOG_LEVEL",'info').upper()

base_log_format = ("%(asctime)s - %(levelname)s "
"in file %(filename)s function "
"%(funcName)s line %(lineno)s : "
"%(message)s")

logging.basicConfig(level=log_level,
format=base_log_format,
handlers=[RichHandler(markup=True)])

logging.info("Start [b green]Restarting[/b green] failed connectors")

restart_failed_connectors_and_tasks()
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions chart/values.yaml → deploy/chart/values.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
guard:
image: docker.io/anvaari/connector-guardian:0.1.0
image: docker.io/anvaari/connector-guardian:0.2.0
spec:
replicas: 1

Expand All @@ -20,5 +20,5 @@ containers:
value: ''
- name: KAFKA_CONNECT_PORT
value: '8083'
- name: KAFKA_CONNECT_PROTO
- name: KAFKA_CONNECT_PROTOCOL
value: 'http'
14 changes: 14 additions & 0 deletions deploy/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: '3'

services:
connector-guardian:
image: docker.io/anvaari/connector-guardian:0.2.0
tty: true
command: /bin/sh -c "while true; do python /connector-guardian/connector_guardian.py; sleep 300; done"
environment:
- KAFKA_CONNECT_HOST=localhost
- KAFKA_CONNECT_USER=
- KAFKA_CONNECT_PASS=
- KAFKA_CONNECT_PORT=8083
- KAFKA_CONNECT_PROTOCOL=http
- LOG_LEVEL=info
Empty file added exceptions/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions exceptions/custom_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class RequestFailedError(Exception):
def __init__(self, url):
self.url = url
super().__init__(f"Failed to make a successful request to {url}")
Empty file added functionalities/__init__.py
Empty file.
72 changes: 72 additions & 0 deletions functionalities/connector_restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
import os
from dotenv import load_dotenv
from rich.logging import RichHandler
from utils.kafka_connect_utils import (get_connectors_status,
restart_connector,
restart_task)

load_dotenv()

log_level = os.getenv("LOG_LEVEL","info").upper()

logger = logging.getLogger(__name__)
logger.setLevel(log_level)

def extract_failed_connectors(connectors_status:dict) -> tuple:
failed_connectors = tuple(
map(
lambda x:x[0],
filter(
lambda x:x[1]['connector'] == 'FAILED',
connectors_status.items()
)
)
)
return failed_connectors

def extract_failed_tasks(connectors_status:dict) -> list:
failed_tasks = []
for conn in connectors_status:
tasks_stat = connectors_status[conn]['tasks']
for task_id in tasks_stat:
if tasks_stat[task_id] == 'FAILED':
failed_tasks.append((conn,task_id))

return failed_tasks

def restart_failed_connectors_and_tasks():
connectors_status = get_connectors_status()
if not connectors_status:
logger.critical("Can't get [b]status[/b] of "
"connectors. Please check the logs")
return None

failed_connectors = extract_failed_connectors(connectors_status)
for conn in failed_connectors:
logger.info(f"Restarting [b]{conn}[/b]..")
restart_status = restart_connector(conn)
if restart_status == True:
logger.info(f"[b]{conn}[/b] "
"Restarted [green]successfully[/green]")
else:
logger.error(f"Restarting [b]{conn}[/b] "
"was [red]failed[/red]")

failed_tasks = extract_failed_tasks(connectors_status)
for conn,task_id in failed_tasks:
logger.info(f"Restarting task [i]{task_id}[/i] of "
f"[b]{conn}[/b]..")
restart_status = restart_task(conn,task_id)
if restart_status == True:
logger.info(f"task [i]{task_id}[/i] of "
f"[b]{conn}[/b] "
"Restarted [green]successfully[/green]")
else:
logger.error(f"Restarting task [i]{task_id}[/i] of "
f"[b]{conn}[/b] "
"was [red]failed[/red]")
if not failed_connectors and not failed_tasks:
logger.info("All tasks and connectors are "
"[green]healthy[/green] "
"[yellow]:)[/yellow]")
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rich~=13.5
python-dotenv~=1.0
requests~=2.31
Loading

0 comments on commit d635f7b

Please sign in to comment.