Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up Flink session cluster and SQL gateway #85

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
run: |
kind load docker-image hoptimator
kind load docker-image hoptimator-flink-runner
kind load docker-image hoptimator-flink-operator
- name: Deploy Dev Environment
run: make deploy-dev-environment
- name: Deploy Hoptimator
Expand Down Expand Up @@ -64,4 +65,4 @@ jobs:
- name: Capture Flink Job Logs
if: always()
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)

24 changes: 15 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ test:
build:
./gradlew build
docker build . -t hoptimator
docker build hoptimator-flink-runner -t hoptimator-flink-runner
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-runner -t hoptimator-flink-runner
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-operator -t hoptimator-flink-operator

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

Expand Down Expand Up @@ -47,26 +48,32 @@ deploy-samples: deploy
undeploy-samples: undeploy
kubectl delete -f ./deploy/samples || echo "skipping"

deploy-flink:
deploy-flink: deploy
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f deploy/samples/flinkDeployment.yaml
kubectl apply -f deploy/samples/flinkSessionJob.yaml
docker compose -f ./deploy/docker/flink/docker-compose-sql-gateway.yaml up -d --wait

undeploy-flink:
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
docker compose -f ./deploy/docker/flink/docker-compose-sql-gateway.yaml down
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
kubectl delete crd flinksessionjobs.flink.apache.org || echo "skipping"
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
helm repo remove flink-operator-repo || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
Expand All @@ -78,18 +85,17 @@ undeploy-kafka:
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
kubectl delete namespace kafka || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
deploy-venice: deploy deploy-flink
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml up -d --wait
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml up -d --wait
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc
kubectl apply -f ./deploy/samples/venicedb.yaml

undeploy-venice:
kubectl delete -f ./deploy/samples/venicedb.yaml || echo "skipping"
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml down
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml down

deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice

Expand Down
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ The below setup will install two local demo DBs, ads and profiles.
> !intro
```

## Set up Kafka & Flink clusters
## Set up dev environment

The below setup will install a Kafka and Flink cluster within Kubernetes.
The below setup will create a dev environment with various resources within Kubernetes.

```
$ make install # build and install SQL CLI
Expand All @@ -60,6 +60,26 @@ The below setup will install a Kafka and Flink cluster within Kubernetes.
> !intro
```

Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolation to deploy individual components.

### Flink

```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
basic-session-deployment-7b94b98b6b-d6jt5 1/1 Running 0 43s
```

Once the Flink deployment pod has STATUS 'Running', you can forward port 8081 and connect to http://localhost:8081/
to access the Flink dashboard.

```
$ kubectl port-forward basic-session-deployment-7b94b98b6b-d6jt5 8081 &
```

See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/)
for sample adhoc queries through Flink.

## The SQL CLI

The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.
Expand Down
12 changes: 12 additions & 0 deletions deploy/docker/flink/docker-compose-sql-gateway.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
flink-sql-gateway:
image: flink:1.18.1
restart: unless-stopped
entrypoint: >
/bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost"
ports:
- 8083:8083
deploy:
resources:
limits:
memory: 1024M
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
hostname: venice-client
tty: true
volumes:
- ./venice:/opt/venice/schemas
- ./schemas:/opt/venice/schemas
depends_on:
venice-router:
condition: service_healthy
36 changes: 0 additions & 36 deletions deploy/samples/flink.yaml

This file was deleted.

18 changes: 18 additions & 0 deletions deploy/samples/flinkDeployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment
spec:
image: flink:1.18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
22 changes: 22 additions & 0 deletions deploy/samples/flinkSessionJob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## This template adds Flink support.

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: JobTemplate
metadata:
name: flink-template
spec:
yaml: |
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: {{name}}
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{flinksql}}
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
24 changes: 12 additions & 12 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ avro = "org.apache.avro:avro:1.10.2"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0"
calcite-core = "org.apache.calcite:calcite-core:1.34.0"
calcite-server = "org.apache.calcite:calcite-server:1.34.0"
flink-clients = "org.apache.flink:flink-clients:1.16.2"
flink-connector-base = "org.apache.flink:flink-connector-base:1.16.2"
flink-core = "org.apache.flink:flink-core:1.16.2"
flink-csv = "org.apache.flink:flink-csv:1.16.2"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.16.2"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.16.2"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.16.2"
flink-table-common = "org.apache.flink:flink-table-common:1.16.2"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.16.2"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.16.2"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:1.16.2"
flink-clients = "org.apache.flink:flink-clients:1.18.1"
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
flink-core = "org.apache.flink:flink-core:1.18.1"
flink-csv = "org.apache.flink:flink-csv:1.18.1"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.18.1"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.18.1"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.18.1"
flink-table-common = "org.apache.flink:flink-table-common:1.18.1"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.18.1"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.18.1"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:3.2.0-1.18"
flink-connector-mysql-cdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafka-clients = "org.apache.kafka:kafka-clients:2.7.1"
kafka-clients = "org.apache.kafka:kafka-clients:3.2.0"
kubernetes-client = "io.kubernetes:client-java:16.0.2"
kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
Expand Down
2 changes: 2 additions & 0 deletions hoptimator-flink-runner/Dockerfile-flink-operator
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM apache/flink-kubernetes-operator:1.9.0
COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM flink:1.16
FROM flink:1.18.1
COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar
17 changes: 2 additions & 15 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,11 @@

insert into kafka."existing-topic-1" select * from kafka."existing-topic-2";
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

metadata:
name: kafka-database-existing-topic-1
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment-example
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
Expand Down
21 changes: 4 additions & 17 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-all.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,16 @@

insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store";
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
metadata:
name: venice-cluster0-test-store-1
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
Expand Down
23 changes: 5 additions & 18 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,17 @@

insert into "VENICE-CLUSTER0"."test-store-1" ("KEY$id", "intField") select "KEY$id", "stringField" from "VENICE-CLUSTER0"."test-store";
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
metadata:
name: venice-cluster0-test-store-1
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS INTEGER) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
Loading
Loading