Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various issues with SQL generation, flink job routing, integration-tests #86

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,28 @@ jobs:
kind load docker-image hoptimator
kind load docker-image hoptimator-flink-runner
kind load docker-image hoptimator-flink-operator
- name: Deploy Dev Environment
run: make deploy-dev-environment
- name: Deploy Hoptimator
run: make deploy
- name: Deploy Samples
run: make deploy-samples
- name: Wait for Readiness
run: kubectl wait kafka/one --for=condition=Ready --timeout=10m -n kafka
- name: Run Integration Tests
run: make integration-tests
run: make integration-tests-kind
- name: Capture Integration Reports
if: failure()
uses: actions/[email protected]
with:
name: reports
path: |
**/build/reports/tests/intTest/
- name: Capture Cluster State
if: always()
run: |
kubectl describe pods
kubectl describe deployments
kubectl describe kafkas -n kafka
kubectl describe flinkdeployments
kubectl describe subscriptions
kubectl get pods
kubectl get svc
kubectl get deployments
kubectl get pods -n kafka
kubectl get svc -n kafka
kubectl get deployments -n kafka
kubectl get kafkas -n kafka
kubectl get flinkdeployments
kubectl get subscriptions
kubectl get pipelines
- name: Capture Flink Job Logs
if: always()
run: |
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ FROM eclipse-temurin:18
WORKDIR /home/
ADD ./hoptimator-operator-integration/build/distributions/hoptimator-operator-integration.tar ./
ADD ./etc/* ./
ENV POD_NAMESPACE_FILEPATH=/var/run/secrets/kubernetes.io/serviceaccount/namespace
ENTRYPOINT ["/bin/sh", "-c"]

19 changes: 13 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ undeploy-config:
deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy
kubectl apply -f ./deploy/dev/rbac.yaml

undeploy: undeploy-config
kubectl delete -f ./deploy/dev/rbac.yaml || echo "skipping"
kubectl delete -f ./deploy || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"

Expand Down Expand Up @@ -64,16 +66,15 @@ undeploy-flink:
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
helm repo remove flink-operator-repo || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
Expand All @@ -83,7 +84,6 @@ undeploy-kafka:
kubectl delete -f ./deploy/samples/kafkadb.yaml || echo "skipping"
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
kubectl delete namespace kafka || echo "skipping"

# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
Expand All @@ -101,15 +101,22 @@ deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice

undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy

# Integration tests expect K8s, Kafka, and Venice to be running
# Integration test setup intended to be run locally
integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
integration-tests-kind: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
./gradlew intTest

generate-models:
./generate-models.sh
./hoptimator-models/generate-models.sh # <-- marked for deletion
Expand All @@ -118,4 +125,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release
.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests integration-tests-kind deploy-dev-environment undeploy-dev-environment generate-models release
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,22 @@ The below setup will create a dev environment with various resources within Kube
```
$ make install # build and install SQL CLI
$ make deploy-dev-environment # start all local dev setups
$ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI
$ kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & # forward external Kafka port for use by SQL CLI
$ ./hoptimator # start the SQL CLI
> !intro
```

Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolation to deploy individual components.

### Kafka

To produce/consume Kafka data, use the following commands:

```
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
```

### Flink

```
Expand All @@ -80,6 +89,26 @@ to access the Flink dashboard.
See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/)
for sample adhoc queries through Flink.

To push a Flink job directly to the Flink deployment created above, `kubectl apply` the following yaml:
```
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: test-flink-session-job
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `datagen-table` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='datagen', 'number-of-rows'='10');
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'topic'='existing-topic-1', 'value.format'='json');
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `datagen-table`;
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
```

## The SQL CLI

The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.
Expand Down
7 changes: 6 additions & 1 deletion deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ spec:
port: 9094
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
Expand All @@ -39,9 +41,12 @@ spec:
type: nodeport
tls: false
configuration:
bootstrap:
nodePort: 31092
brokers:
- broker: 0
advertisedHost: localhost
advertisedHost: 127.0.0.1
nodePort: 31234
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
Expand Down
4 changes: 2 additions & 2 deletions deploy/dev/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
kind: ClusterRoleBinding
metadata:
name: hoptimator-operator
namespace: default
Expand All @@ -8,6 +8,6 @@ subjects:
name: hoptimator-operator
namespace: default
roleRef:
kind: Role
kind: ClusterRole
name: hoptimator-operator
apiGroup: rbac.authorization.k8s.io
2 changes: 2 additions & 0 deletions deploy/docker/venice/docker-compose-single-dc-setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
hostname: kafka
environment:
- ZOOKEEPER_ADDRESS=zookeeper:2181
ports:
- 9095:9092
depends_on:
zookeeper:
condition: service_healthy
Expand Down
8 changes: 4 additions & 4 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
kind: ClusterRole
metadata:
namespace: default
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs", "pipelines"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status", "pipelines/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
resources: ["flinkdeployments", "flinksessionjobs"]
verbs: ["get", "update", "create"]

5 changes: 3 additions & 2 deletions deploy/samples/flinkDeployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ kind: FlinkDeployment
metadata:
name: basic-session-deployment
spec:
image: flink:1.18
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
taskmanager.numberOfTaskSlots: "3"
serviceAccount: flink
jobManager:
resource:
Expand Down
4 changes: 3 additions & 1 deletion deploy/samples/kafkadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ spec:
connector: |
connector = kafka
topic = {{table}}
properties.bootstrap.servers = localhost:9092
properties.bootstrap.servers = one-kafka-bootstrap.kafka.svc.cluster.local:9094
value.format = json
scan.startup.mode = earliest-offset

---

Expand Down
12 changes: 12 additions & 0 deletions etc/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,17 @@ kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
# Additional worker nodes will not add any real resource capacity. However,
# it should mean that there are more CPU reservations to hand out.
networking:
ipFamily: ipv4
apiServerAddress: 127.0.0.1
nodes:
- role: control-plane
extraPortMappings:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems important :)

- containerPort: 31092
hostPort: 9092
listenAddress: "127.0.0.1"
protocol: TCP
- containerPort: 31234
hostPort: 31234
listenAddress: "127.0.0.1"
protocol: TCP
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.IOException;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Optional;
Expand All @@ -14,6 +15,7 @@
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.KubeConfig;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
Expand Down Expand Up @@ -101,13 +103,26 @@ public static void useContext(K8sContext context) {
}

static K8sContext defaultContext() throws IOException {
File file = Paths.get(System.getProperty("user.home"), ".kube", "config").toFile();
try (Reader r = Files.newBufferedReader(file.toPath())) {
KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r);
kubeConfig.setFile(file);
ApiClient apiClient = ClientBuilder.kubeconfig(kubeConfig).build();
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default");
return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient);
Path path = Paths.get(System.getProperty("user.home"), ".kube", "config");
if (Files.exists(path)) {
File file = path.toFile();
try (Reader r = Files.newBufferedReader(file.toPath())) {
KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r);
kubeConfig.setFile(file);
ApiClient apiClient = ClientBuilder.kubeconfig(kubeConfig).build();
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default");
return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient);
}
} else {
ApiClient apiClient = Config.defaultClient();
String filePath = System.getenv("POD_NAMESPACE_FILEPATH");
String namespace;
if (filePath == null) {
namespace = "default";
} else {
namespace = new String(Files.readAllBytes(Paths.get(filePath)));
}
return new K8sContext("default", namespace, apiClient);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow totally missed this

}
}
}
10 changes: 5 additions & 5 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ kind: FlinkSessionJob
metadata:
name: kafka-database-existing-topic-1
spec:
deploymentName: basic-session-deployment-example
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
jarURI: local:///opt/hoptimator-flink-runner.jar
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.format'='json')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.format'='json')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `existing-topic-2`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
Expand Down
2 changes: 1 addition & 1 deletion hoptimator-operator-integration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
dependencies {
implementation project(':hoptimator-operator')
implementation libs.slf4j.simple

testImplementation libs.junit
testImplementation libs.assertj
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


public class PipelineOperatorApp {
private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class);
private static final Logger log = LoggerFactory.getLogger(PipelineOperatorApp.class);

public static void main(String[] args) throws Exception {
new PipelineOperatorApp().run();
Expand Down
Loading
Loading