Skip to content

Commit

Permalink
Fix various issues with SQL generation, flink job routing, integratio…
Browse files Browse the repository at this point in the history
…n-tests (#86)

* Fix Flink SQL syntax and ability to push jobs to session cluster

* Update integration-test step

* Fix hoptimator-operator crashing issue
  • Loading branch information
jogrogan authored Jan 16, 2025
1 parent 7c0455f commit c39e65e
Show file tree
Hide file tree
Showing 19 changed files with 146 additions and 54 deletions.
32 changes: 18 additions & 14 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,28 @@ jobs:
kind load docker-image hoptimator
kind load docker-image hoptimator-flink-runner
kind load docker-image hoptimator-flink-operator
- name: Deploy Dev Environment
run: make deploy-dev-environment
- name: Deploy Hoptimator
run: make deploy
- name: Deploy Samples
run: make deploy-samples
- name: Wait for Readiness
run: kubectl wait kafka/one --for=condition=Ready --timeout=10m -n kafka
- name: Run Integration Tests
run: make integration-tests
run: make integration-tests-kind
- name: Capture Integration Reports
if: failure()
uses: actions/[email protected]
with:
name: reports
path: |
**/build/reports/tests/intTest/
- name: Capture Cluster State
if: always()
run: |
kubectl describe pods
kubectl describe deployments
kubectl describe kafkas -n kafka
kubectl describe flinkdeployments
kubectl describe subscriptions
kubectl get pods
kubectl get svc
kubectl get deployments
kubectl get pods -n kafka
kubectl get svc -n kafka
kubectl get deployments -n kafka
kubectl get kafkas -n kafka
kubectl get flinkdeployments
kubectl get subscriptions
kubectl get pipelines
- name: Capture Flink Job Logs
if: always()
run: |
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ FROM eclipse-temurin:18
WORKDIR /home/
ADD ./hoptimator-operator-integration/build/distributions/hoptimator-operator-integration.tar ./
ADD ./etc/* ./
ENV POD_NAMESPACE_FILEPATH=/var/run/secrets/kubernetes.io/serviceaccount/namespace
ENTRYPOINT ["/bin/sh", "-c"]

19 changes: 13 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ undeploy-config:
deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy
kubectl apply -f ./deploy/dev/rbac.yaml

undeploy: undeploy-config
kubectl delete -f ./deploy/dev/rbac.yaml || echo "skipping"
kubectl delete -f ./deploy || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"

Expand Down Expand Up @@ -64,16 +66,15 @@ undeploy-flink:
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
helm repo remove flink-operator-repo || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
Expand All @@ -83,7 +84,6 @@ undeploy-kafka:
kubectl delete -f ./deploy/samples/kafkadb.yaml || echo "skipping"
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
kubectl delete namespace kafka || echo "skipping"

# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
Expand All @@ -101,15 +101,22 @@ deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice

undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy

# Integration tests expect K8s, Kafka, and Venice to be running
# Integration test setup intended to be run locally
integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
integration-tests-kind: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
./gradlew intTest

generate-models:
./generate-models.sh
./hoptimator-models/generate-models.sh # <-- marked for deletion
Expand All @@ -118,4 +125,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release
.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests integration-tests-kind deploy-dev-environment undeploy-dev-environment generate-models release
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,22 @@ The below setup will create a dev environment with various resources within Kube
```
$ make install # build and install SQL CLI
$ make deploy-dev-environment # start all local dev setups
$ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI
$ kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & # forward external Kafka port for use by SQL CLI
$ ./hoptimator # start the SQL CLI
> !intro
```

Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolation to deploy individual components.

### Kafka

To produce/consume Kafka data, use the following commands:

```
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
```

### Flink

```
Expand All @@ -80,6 +89,26 @@ to access the Flink dashboard.
See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/)
for sample adhoc queries through Flink.

To push a Flink job directly to the Flink deployment created above, `kubectl apply` the following yaml:
```
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: test-flink-session-job
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `datagen-table` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='datagen', 'number-of-rows'='10');
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'topic'='existing-topic-1', 'value.format'='json');
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `datagen-table`;
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
```

## The SQL CLI

The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.
Expand Down
7 changes: 6 additions & 1 deletion deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ spec:
port: 9094
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
Expand All @@ -39,9 +41,12 @@ spec:
type: nodeport
tls: false
configuration:
bootstrap:
nodePort: 31092
brokers:
- broker: 0
advertisedHost: localhost
advertisedHost: 127.0.0.1
nodePort: 31234
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
Expand Down
4 changes: 2 additions & 2 deletions deploy/dev/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
kind: ClusterRoleBinding
metadata:
name: hoptimator-operator
namespace: default
Expand All @@ -8,6 +8,6 @@ subjects:
name: hoptimator-operator
namespace: default
roleRef:
kind: Role
kind: ClusterRole
name: hoptimator-operator
apiGroup: rbac.authorization.k8s.io
2 changes: 2 additions & 0 deletions deploy/docker/venice/docker-compose-single-dc-setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
hostname: kafka
environment:
- ZOOKEEPER_ADDRESS=zookeeper:2181
ports:
- 9095:9092
depends_on:
zookeeper:
condition: service_healthy
Expand Down
8 changes: 4 additions & 4 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
kind: ClusterRole
metadata:
namespace: default
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs", "pipelines"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status", "pipelines/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
resources: ["flinkdeployments", "flinksessionjobs"]
verbs: ["get", "update", "create"]

5 changes: 3 additions & 2 deletions deploy/samples/flinkDeployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ kind: FlinkDeployment
metadata:
name: basic-session-deployment
spec:
image: flink:1.18
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
taskmanager.numberOfTaskSlots: "3"
serviceAccount: flink
jobManager:
resource:
Expand Down
4 changes: 3 additions & 1 deletion deploy/samples/kafkadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ spec:
connector: |
connector = kafka
topic = {{table}}
properties.bootstrap.servers = localhost:9092
properties.bootstrap.servers = one-kafka-bootstrap.kafka.svc.cluster.local:9094
value.format = json
scan.startup.mode = earliest-offset
---

Expand Down
12 changes: 12 additions & 0 deletions etc/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,17 @@ kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
# Additional worker nodes will not add any real resource capacity. However,
# it should mean that there are more CPU reservations to hand out.
networking:
ipFamily: ipv4
apiServerAddress: 127.0.0.1
nodes:
- role: control-plane
extraPortMappings:
- containerPort: 31092
hostPort: 9092
listenAddress: "127.0.0.1"
protocol: TCP
- containerPort: 31234
hostPort: 31234
listenAddress: "127.0.0.1"
protocol: TCP
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.IOException;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Optional;
Expand All @@ -14,6 +15,7 @@
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.KubeConfig;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
Expand Down Expand Up @@ -101,13 +103,26 @@ public static void useContext(K8sContext context) {
}

static K8sContext defaultContext() throws IOException {
File file = Paths.get(System.getProperty("user.home"), ".kube", "config").toFile();
try (Reader r = Files.newBufferedReader(file.toPath())) {
KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r);
kubeConfig.setFile(file);
ApiClient apiClient = ClientBuilder.kubeconfig(kubeConfig).build();
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default");
return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient);
Path path = Paths.get(System.getProperty("user.home"), ".kube", "config");
if (Files.exists(path)) {
File file = path.toFile();
try (Reader r = Files.newBufferedReader(file.toPath())) {
KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r);
kubeConfig.setFile(file);
ApiClient apiClient = ClientBuilder.kubeconfig(kubeConfig).build();
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default");
return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient);
}
} else {
ApiClient apiClient = Config.defaultClient();
String filePath = System.getenv("POD_NAMESPACE_FILEPATH");
String namespace;
if (filePath == null) {
namespace = "default";
} else {
namespace = new String(Files.readAllBytes(Paths.get(filePath)));
}
return new K8sContext("default", namespace, apiClient);
}
}
}
10 changes: 5 additions & 5 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ kind: FlinkSessionJob
metadata:
name: kafka-database-existing-topic-1
spec:
deploymentName: basic-session-deployment-example
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
jarURI: local:///opt/hoptimator-flink-runner.jar
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.format'='json')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.format'='json')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `existing-topic-2`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
Expand Down
2 changes: 1 addition & 1 deletion hoptimator-operator-integration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
dependencies {
implementation project(':hoptimator-operator')
implementation libs.slf4j.simple

testImplementation libs.junit
testImplementation libs.assertj
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


public class PipelineOperatorApp {
private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class);
private static final Logger log = LoggerFactory.getLogger(PipelineOperatorApp.class);

public static void main(String[] args) throws Exception {
new PipelineOperatorApp().run();
Expand Down
Loading

0 comments on commit c39e65e

Please sign in to comment.