Skip to content

Commit

Permalink
Merge pull request #1 from sentoz/sidecar
Browse files Browse the repository at this point in the history
add Sidecar mode
  • Loading branch information
sentoz authored Oct 23, 2021
2 parents 6d32911 + f8200d3 commit a3a8eed
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 30 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ ARG JQ_VERSION="1.6"

LABEL maintainer="[email protected]"

ENV SIDECAR_MODE=false
ENV KAFKA_CONNECT_HOST=localhost
ENV KAFKA_CONNECT_PORT=8083

ENV REQUEST_DELAY=30

RUN apk add --update --no-cache \
bash curl sort && \
Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
# Kafka Connectors Restarter <!-- omit in toc -->

* [Description](#description)
* [Quick Start Guides](#quick-start-guides)
* [Dependencies](#dependencies)
* [Environment variables](#environment-variables)
* [Plans for the future](#plans-for-the-future)

## Description

Container for creating a cronjob, which, through the kafka connect API, checks the state of the connector and restarts it if necessary
A container that, through the kafka connect API, checks the state of the connector and, if necessary, restarts it.

Supports work in two modes:

* cron job
* sidecar

## Quick Start Guides

* [Run in kubernetes as a cron job](docs/cronjob.md)
* [Run in kubernetes as a sidecar container](docs/sidecar.md)

## Dependencies

Expand All @@ -21,12 +31,10 @@ Required dependencies:
```yaml
KAFKA_CONNECT_HOST=localhost
KAFKA_CONNECT_PORT=8083
REQUEST_DELAY=30
SIDECAR_MODE=false
```

## Plans for the future

* implement the ability to work as a sidecar as a container

<!--
Title: Kafka Connectors Restarte
Description: Restart you connectors in Kafka Connect.
Expand Down
66 changes: 43 additions & 23 deletions connector-restart
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,53 @@ warn () {
else log WARNING "$*"; fi
}


#Set vars
kafka_connect_host="${kafka_connect_host:-$KAFKA_CONNECT_HOST}"
kafka_connect_port="${kafka_connect_port:-$KAFKA_CONNECT_PORT}"

: "${SIDECAR_MODE:=false}"
: "${KAFKA_CONNECT_HOST:=localhost}"
: "${KAFKA_CONNECT_PORT:=8083}"
: "${REQUEST_DELAY:=30}"

#Print vars
info KAFKA_CONNECT_HOST="$kafka_connect_host"
info KAFKA_CONNECT_PORT="$kafka_connect_port"



connectorsjson=$(
curl -s "http://$kafka_connect_host:$kafka_connect_port/connectors?expand=status"
)
info SIDECAR_MODE="$SIDECAR_MODE"
info KAFKA_CONNECT_HOST="$KAFKA_CONNECT_HOST"
info KAFKA_CONNECT_PORT="$KAFKA_CONNECT_PORT"
info REQUEST_DELAY="$REQUEST_DELAY"

status=$?
if [ $status -eq 0 ]
if [ "$SIDECAR_MODE" = true ]
then

# List current connectors and status
echo connectorsjson | \
jq -c -M 'map({name: .status.name } + {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})} | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
xargs -I{connector_and_task} curl -v -X POST "http://$kafka_connect_host:$kafka_connect_port"\{connector_and_task\}

while true
do
if connectorsjson=$(
curl -Lff "http://$KAFKA_CONNECT_HOST:$KAFKA_CONNECT_PORT/connectors?expand=status"
)
then
# List current connectors and status
echo connectorsjson | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
# Restart any connector tasks that are FAILED
echo connectorsjson | \
jq -c -M 'map({name: .status.name } + {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})} | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
xargs -I{connector_and_task} curl -v -X POST "http://$KAFKA_CONNECT_HOST:$KAFKA_CONNECT_PORT"\{connector_and_task\}
else
fail 'Connect to API Kafka Connect' \
http://$KAFKA_CONNECT_HOST:$KAFKA_CONNECT_PORT \
'failed with status code' $? 'and message' $connectorsjson
fi
sleep "$REQUEST_DELAY"m
done
else
fail 'Connect to API Kafka Connect' \
http://$kafka_connect_host:$kafka_connect_port \
'failed with status code' $status
if connectorsjson=$(
curl -Lff "http://$KAFKA_CONNECT_HOST:$KAFKA_CONNECT_PORT/connectors?expand=status"
)
then
# Restart any connector tasks that are FAILED
echo connectorsjson | \
jq -c -M 'map({name: .status.name } + {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})} | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
xargs -I{connector_and_task} curl -v -X POST "http://$KAFKA_CONNECT_HOST:$KAFKA_CONNECT_PORT"\{connector_and_task\}
else
fail 'Connect to API Kafka Connect' \
http://$KAFKA_CONNECT_HOST:$KAFKA_CONNECT_PORT \
'failed with status code' $? 'and message' $connectorsjson
fi
fi
33 changes: 33 additions & 0 deletions deploy/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
- apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: connector-restart-cronjob
spec:
schedule: "*/5 * * * *"
jobTemplate:
spec:
template:
metadata:
labels:
parent: "cronjob-connector-restart"
spec:
containers:
- name: cronjob-connector-restart
image: {IMAGE_URL}
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 10m
memory: 10Mi
requests:
cpu: 50m
memory: 50Mi
env:
- name: KAFKA_CONNECT_HOST
value: 'kafka-connect'
- name: KAFKA_CONNECT_PORT
value: '8083'
- name: SIDECAR_MODE
value: 'false'
restartPolicy: OnFailure
80 changes: 80 additions & 0 deletions deploy/sidecar.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
- kind: Deployment
apiVersion: apps/v1
metadata:
name: kafka-connect
app: kafka-connect
spec:
strategy:
type: Recreate
triggers:
- type: ConfigChange
replicas: 1
selector:
matchLabels:
name: kafka-connect
template:
metadata:
labels:
template: kafka-connect
name: kafka-connect
app: kafka
spec:
containers:
- name: connector-restart
image: {IMAGE_URL}
env:
- name: KAFKA_CONNECT_HOST
value: localhost
- name: KAFKA_CONNECT_PORT
value: "8083"
- name: REQUEST_DELAY
value: "30"
- name: SIDECAR_MODE
value: "true"
resources:
limits:
cpu: '10m'
memory: '10Mi'
requests:
cpu: '50m'
memory: '50Mi'
- name: kafka-connect
image: debezium/connect:1.4
ports:
- containerPort: 8080
protocol: TCP
- containerPort: 8443
protocol: TCP
- containerPort: 7600
protocol: TCP
- containerPort: 57600
protocol: TCP
resources:
limits:
cpu: 500m
memory: 1500Mi
requests:
cpu: 500m
memory: 1500Mi
env:
- name: REST_HOST_NAME
value: localhost
...
...
...
restartPolicy: Always
dnsPolicy: ClusterFirst
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- kafka-connect
namespaces:
- NAMESPACE
topologyKey: kubernetes.io/hostname
enableServiceLinks: false
30 changes: 30 additions & 0 deletions docs/cronjob.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Run in kubernetes as a cron job

Here's how to set up a container to run in cron job mode.

You can read about CronJobs in [doc](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/)

* [Run in kubernetes as a cron job](#run-in-kubernetes-as-a-cron-job)
* [Configuring kafka connect](#configuring-kafka-connect)
* [Deploy cron job](#deploy-cron-job)

## Configuring kafka connect

The main thing that needs to be done is:

* create a service for kafka connect
* ask the value of the `REST_HOST_NAME` variable is equal to the service name

## Deploy cron job

> ⚠️ **Attention!** ⚠️
>
> This is an example to demonstrate how it works.
> Do not do this in a production environment,
> this is just an example.
>
> First, customize the manifest to suit your environment.
```shell
kubectl apply -n kube-dump -f deploy/cronjob.yaml
```
29 changes: 29 additions & 0 deletions docs/sidecar.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Run in kubernetes as a sidecar container

Here's how to set up a container to run in sidecar mode.

You can read about sidecar in [doc](https://kubernetes.io/docs/concepts/workloads/pods/#workload-resources-for-managing-pods)

* [Run in kubernetes as a sidecar container](#run-in-kubernetes-as-a-sidecar-container)
* [Configuring kafka connect](#configuring-kafka-connect)
* [Deploy sidecar](#deploy-sidecar)

## Configuring kafka connect

The main thing that needs to be done is:

* ask the value of the `REST_HOST_NAME` variable is equal to the **localhost**

## Deploy sidecar

> ⚠️ **Attention!** ⚠️
>
> This is an example to demonstrate how it works.
> Do not do this in a production environment,
> this is just an example.
>
> First, customize the manifest to suit your environment.
```shell
kubectl apply -n kube-dump -f deploy/sidecar.yaml
```

0 comments on commit a3a8eed

Please sign in to comment.